• 延时消息队列


    目录

    前言

    一、延时队列实用场景

    二、DelayQueue 

    DelayQueue的实现

    使用延迟队列 

     DelayQueue实现延时任务的优缺点

    三、RocketMQ

    原理

    四、Kafka

    原理

    实现 

    DelayMessage定义

    消息发送代码 

    消费者代码 

    参考


    前言

    延时队列的内部是有序的,最重要的特性就体现在它的延时属性上,延时队列就是用来存放需要在指定时间点被处理的元素的队列

    队列是存储消息的载体,延时队列存储的对象是延时消息。所谓的延时消息,是指消息被发送以后,并不想让消费者立刻获取,而是等待特定的时间后,消费者才能获取这个消息进行消费。


    一、延时队列实用场景

    • 淘宝七天自动确认收货。在我们签收商品后,物流系统会在七天后延时发送一个消息给支付系统,通知支付系统将货款打给商家,这个过程持续七天,就是使用了消息中间件的延迟推送功能;

    • 订单在三十分钟之内未支付则自动取消;

    • 新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒;

    • 用户注册成功后,如果三天内没有登陆则进行短信提醒

    • 用户发起退款,如果三天内没有得到处理则通知相关运营人员;

    • 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议。

    二、DelayQueue 

    1. public class DelayQueueextends Delayed> extends AbstractQueue
    2. implements BlockingQueue {
    3. }

    DelayQueue是一个无界的BlockingQueue,是线程安全的(无界指的是队列的元素数量不存在上限,队列的容量会随着元素数量的增加而扩容,阻塞队列指的是当队列内元素数量为0的时候,试图从队列内获取元素的线程将被阻塞或者抛出异常)

    以上是阻塞队列的特点,而延迟队列还拥有自己如下的特点:

    DelayQueue中存入的必须是实现了Delayed接口的对象(Delayed定义了一个getDelay的方法,用来判断排序后的元素是否可以从Queue中取出,并且Delayed接口还继承了Comparable用于排序),插入Queue中的数据根据compareTo方法进行排序(DelayQueue的底层存储是一个PriorityQueue,PriorityQueue是一个可排序的Queue,其中的元素必须实现Comparable接口的compareTo方法),并通过getDelay方法返回的时间确定元素是否可以出队,只有小于等于0的元素(即延迟到期的元素)才能够被取出

    延迟队列不接收null元素

    DelayQueue的实现

    1. public class UserDelayTask implements Delayed {
    2. @Getter
    3. private UserRegisterMessage message;
    4. private long delayTime;
    5. public UserDelayTask(UserRegisterMessage message, long delayTime) {
    6. this.message = message;
    7. // 延迟时间加当前时间
    8. this.delayTime = System.currentTimeMillis() + delayTime;
    9. }
    10. // 获取任务剩余时间
    11. @Override
    12. public long getDelay(TimeUnit unit) {
    13. return unit.convert(delayTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    14. }
    15. @Override
    16. public int compareTo(Delayed o) {
    17. return Long.compare(delayTime, ((UserDelayTask) o).delayTime);
    18. }
    19. }

    定义延迟队列并交付容器管理 

    1. /**
    2. * 延迟队列
    3. */
    4. @Bean("userDelayQueue")
    5. public DelayQueue orderDelayQueue() {
    6. return new DelayQueue();
    7. }

    使用延迟队列 

    1. @Resource
    2. private DelayQueue orderDelayQueue;
    3. UserDelayTask task = new UserDelayTask(message, 1000 * 60);
    4. orderDelayQueue.add(task);

    开启线程处理延迟任务

    1. @Override
    2. public void afterPropertiesSet() throws Exception {
    3. new Thread(() -> {
    4. while (true) {
    5. try {
    6. UserDelayTask task = orderDelayQueue.take();
    7. // 当队列为null的时候,poll()方法会直接返回null, 不会抛出异常,但是take()方法会一直等待,
    8. // 因此会抛出一个InterruptedException类型的异常。(当阻塞方法收到中断请求的时候就会抛出InterruptedException异常)
    9. UserRegisterMessage message = task.getMessage();
    10. execute(message);
    11. // 执行业务
    12. } catch (Exception ex) {
    13. log.error("afterPropertiesSet", ex);
    14. }
    15. }
    16. }).start();
    17. }

     DelayQueue实现延时任务的优缺点

    使用DelayQueue实现延时任务非常简单,而且简便,全部都是标准的JDK代码实现,不用引入第三方依赖(不依赖redis实现、消息队列实现等),非常的轻量级。

    它的缺点就是所有的操作都是基于应用内存的,一旦出现应用单点故障,可能会造成延时任务数据的丢失。如果订单并发量非常大,因为DelayQueue是无界的,订单量越大,队列内的对象就越多,可能造成OOM的风险。所以使用DelayQueue实现延时任务,只适用于任务量较小的情况。
     

    三、RocketMQ

    RocketMQ 和本身就有延迟队列的功能,但是开源版本只能支持固定延迟时间的消息,不支持任意时间精度的消息(这个好像只有阿里云版本的可以)。

    他的默认时间间隔分为 18 个级别,基本上也能满足大部分场景的需要了。

    默认延迟级别:1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h。

    使用起来也非常的简单,直接通过setDelayTimeLevel设置延迟级别即可。

    setDelayTimeLevel(level)

    原理

    实现原理说起来比较简单,Broker 会根据不同的延迟级别创建出多个不同级别的队列,当我们发送延迟消息的时候,根据不同的延迟级别发送到不同的队列中,同时在 Broker 内部通过一个定时器去轮询这些队列(RocketMQ 会为每个延迟级别分别创建一个定时任务),如果消息达到发送时间,那么就直接把消息发送到指 topic 队列中。

    RocketMQ 这种实现方式是放在服务端去做的,同时有个好处就是相同延迟时间的消息是可以保证有序性的。

    谈到这里就顺便提一下关于消息消费重试的原理,这个本质上来说其实是一样的,对于消费失败需要重试的消息实际上都会被丢到延迟队列的 topic 里,到期后再转发到真正的 topic 中。

    四、Kafka

    对于 Kafka 来说,原生并不支持延迟队列的功能,需要我们手动去实现,这里我根据 RocketMQ 的设计提供一个实现思路。

    这个设计,我们也不支持任意时间精度的延迟消息,只支持固定级别的延迟,因为对于大部分延迟消息的场景来说足够使用了。

    只创建一个 topic,但是针对该 topic 创建 18 个 partition,每个 partition 对应不同的延迟级别,这样做和 RocketMQ 一样有个好处就是能达到相同延迟时间的消息达到有序性。

    原理

    • 首先创建一个单独针对延迟队列的 topic,同时创建 18 个 partition 针对不同的延迟级别
    • 发送消息的时候根据消息延迟等级发送到延迟 topic 对应的 partition,同时把原 topic 保存到 延迟消息 中。
    • 内嵌的consumer单独设置一个ConsumerGroup去消费延迟 topic 消息,消费到消息之后如果没有达到延迟时间那么就进行pause,然后seek到当前ConsumerRecordoffset位置,同时使用定时器去轮询延迟的TopicPartition,达到延迟时间之后进行resume。

           KafkaConsumer 提供了暂停和恢复的API函数,调用消费者的暂停方法后就无法再拉取到新的消息,同时长时间不消费kafka也不会认为这个消费者已经挂掉了。

    • 如果达到了延迟时间,那么就获取到延迟消息中的真实 topic ,直接转发

    这里为什么要进行pauseresume呢?因为如果不这样的话,如果超时未消费达到max.poll.interval.ms 最大时间(默认300s),那么将会触发 Rebalance。

    实现 

    DelayMessage定义
    1. /**
    2. * 延迟消息
    3. *
    4. * @author yangyanping
    5. * @date 2023-08-31
    6. */
    7. @Getter
    8. @Setter
    9. @ToString
    10. public class DelayMessage implements DTO {
    11. /**
    12. * 消息级别,共18个,对应18个partition
    13. */
    14. private Integer level;
    15. /**
    16. * 业务类型,真实投递到的topic
    17. */
    18. private String topic;
    19. /**
    20. * 目标消息key
    21. */
    22. private String key;
    23. /**
    24. * 事件
    25. */
    26. private DomainEvent event;
    27. }
    消息发送代码 
    1. public void publishAsync(DelayMessage delayMessage) {
    2. String topic = "delay_topic";
    3. try {
    4. Integer level = delayMessage.getLevel();
    5. Integer delayPartition = level - 1;
    6. String data = JSON.toJSONString(delayMessage);
    7. ProducerRecord producerRecord = new ProducerRecord<>(topic, delayPartition, "", data);
    8. ListenableFuture> future = kafkaTemplate.send(producerRecord);
    9. future.addCallback(new ListenableFutureCallback>() {
    10. @Override
    11. public void onSuccess(SendResult result) {
    12. //发送成功后回调
    13. log.info("{}-异步发送成功, result={}。", topic, result.getRecordMetadata().toString());
    14. }
    15. @Override
    16. public void onFailure(Throwable throwable) {
    17. //发送失败回调
    18. log.error("{}-异步发送失败。", topic, throwable);
    19. }
    20. });
    21. } catch (Exception ex) {
    22. log.error("{}-异步发送异常。", topic, ex);
    23. }
    24. }
    消费者代码 
    1. import cn.hutool.core.collection.CollectionUtil;
    2. import cn.hutool.core.convert.Convert;
    3. import com.alibaba.fastjson.JSON;
    4. import com.alibaba.fastjson.JSONObject;
    5. import com.xinwu.shushan.exception.BizException;
    6. import com.xinwu.shushan.launch.infra.publisher.KafkaPublisher;
    7. import lombok.extern.slf4j.Slf4j;
    8. import org.apache.kafka.clients.consumer.Consumer;
    9. import org.apache.kafka.clients.consumer.ConsumerRecord;
    10. import org.apache.kafka.clients.consumer.OffsetAndMetadata;
    11. import org.apache.kafka.common.TopicPartition;
    12. import org.springframework.kafka.annotation.KafkaListener;
    13. import org.springframework.kafka.listener.ConsumerSeekAware;
    14. import org.springframework.stereotype.Component;
    15. import javax.annotation.PostConstruct;
    16. import javax.annotation.Resource;
    17. import java.util.*;
    18. import java.util.concurrent.Executors;
    19. import java.util.concurrent.ScheduledExecutorService;
    20. import java.util.concurrent.TimeUnit;
    21. /**
    22. * 参考RocketMQ支持延迟消息设计,不支持任意时间精度的延迟消息,只支持特定级别的延迟消息,
    23. * 将消息延迟等级分为1s、5s、10s 、30s、1m、2m、3m、4m、5m、6m、7m、8m、9m、10m、20m、30m、1h、2h,共18个级别,
    24. * 只创建一个有18个分区的延时topic,每个分区对应不同延时等级。
    25. *

    26. * https://blog.csdn.net/weixin_40270946/article/details/121293032
    27. *

    28. * https://zhuanlan.zhihu.com/p/365802989
    29. *
    30. * @author yangyanping
    31. * @date 2023-08-30
    32. */
    33. @Slf4j
    34. @Component
    35. public class DelayConsumer implements ConsumerSeekAware {
    36. /**
    37. * 锁
    38. */
    39. private final Object lock = new Object();
    40. /**
    41. * 间隔
    42. */
    43. private final int interval = 5000;
    44. private ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
    45. /**
    46. * 消费者
    47. */
    48. private volatile Consumer kafkaConsumer;
    49. @Resource
    50. private KafkaPublisher kafkaPublisher;
    51. @PostConstruct
    52. public void init() {
    53. //当系统需要循环间隔一定时间执行某项任务的时候可以使用scheduleWithFixedDelay方法来实现
    54. executorService.scheduleWithFixedDelay(() -> {
    55. synchronized (lock) {
    56. resume();
    57. lock.notifyAll();
    58. log.info("DelayConsumer-notifyAll");
    59. }
    60. }, 0, interval, TimeUnit.MILLISECONDS);
    61. }
    62. /**
    63. * 批量消费消息
    64. */
    65. @KafkaListener(topics = "#{'${shushan.launch.event.delayTopic.topic}'}", groupId = "#{'${spring.kafka.consumer.group-id}'}", concurrency = "1")
    66. public void onMessage(List> records, Consumer consumer) {
    67. synchronized (lock) {
    68. try {
    69. /**
    70. * 7、ConcurrentModificationException
    71. * java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
    72. * 报错内容:线程不安全
    73. *
    74. * 原因分析:Kafka consumer是非线程安全的
    75. */
    76. this.kafkaConsumer = consumer;
    77. if (CollectionUtil.isEmpty(records)) {
    78. log.info("DelayConsumer-records is empty !");
    79. consumer.commitSync();
    80. return;
    81. }
    82. boolean delay = false;
    83. for (ConsumerRecord record : records) {
    84. long timestamp = record.timestamp();
    85. String value = record.value();
    86. JSONObject jsonObject = JSON.parseObject(value);
    87. Integer level = Convert.toInt(jsonObject.get("level"));
    88. String targetTopic = Convert.toStr(jsonObject.get("topic"));
    89. String event = Convert.toStr(jsonObject.get("event"));
    90. String msgKey = Convert.toStr(jsonObject.get("key"));
    91. TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
    92. long delayTime = getDelayTime(timestamp, level);
    93. if (delayTime <= System.currentTimeMillis()) {
    94. log.info("DelayConsumer-delayTime={} <= currentTime={},msgKey={}", delayTime, System.currentTimeMillis(), msgKey);
    95. // 处理消息
    96. processMessage(record, consumer, topicPartition, targetTopic, msgKey, event);
    97. } else {
    98. log.info("DelayConsumer-delayTime={} > currentTime={},msgKey={},partition={}", delayTime, System.currentTimeMillis(), msgKey, topicPartition.partition());
    99. // 暂停消费
    100. consumer.pause(Collections.singletonList(topicPartition));
    101. consumer.seek(topicPartition, record.offset());
    102. delay = true;
    103. break;
    104. }
    105. }
    106. if (delay) {
    107. lock.wait();
    108. }
    109. } catch (Exception var10) {
    110. log.error("DelayConsumer.onMessage error", var10);
    111. throw new BizException("事件消息消费失败", var10);
    112. }
    113. }
    114. }
    115. /**
    116. * 消息级别,共6个
    117. * level-1 :15s
    118. * level-2 : 30s
    119. * level-3 : 1m
    120. * level-4 : 5m
    121. * level-5 : 10m
    122. * level-6 : 30m
    123. */
    124. private Long getDelayTime(long timestamp, Integer level) {
    125. switch (level) {
    126. case 1:
    127. return timestamp + 25 * 1000;
    128. case 2:
    129. return timestamp + 30 * 1000;
    130. case 3:
    131. return timestamp + 1 * 60 * 1000;
    132. case 4:
    133. return timestamp + 5 * 60 * 1000;
    134. case 5:
    135. return timestamp + 10 * 60 * 1000;
    136. case 6:
    137. return timestamp + 30 * 60 * 1000;
    138. }
    139. return timestamp;
    140. }
    141. /**
    142. * 处理消息 并提交消息
    143. */
    144. private void processMessage(ConsumerRecord record, Consumer consumer, TopicPartition topicPartition, String targetTopic, String msgKey, String event) {
    145. OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(record.offset() + 1);
    146. HashMap metadataHashMap = new HashMap<>();
    147. metadataHashMap.put(topicPartition, offsetAndMetadata);
    148. //发布目标
    149. kafkaPublisher.publishSync(targetTopic, msgKey, event);
    150. log.info("DelayConsumer-records#offset={},targetTopic={},event={}", record.offset() + 1, targetTopic, event);
    151. consumer.commitSync(metadataHashMap);
    152. }
    153. /**
    154. * 重启消费
    155. */
    156. private void resume() {
    157. try {
    158. if (kafkaConsumer == null) {
    159. return;
    160. }
    161. Set topicPartitions = kafkaConsumer.paused();
    162. if (CollectionUtil.isEmpty(topicPartitions)) {
    163. return;
    164. }
    165. kafkaConsumer.resume(topicPartitions);
    166. } catch (Exception ex) {
    167. log.error("DelayConsumer-resume", ex);
    168. }
    169. }
    170. }

    参考

    RabbitMQ、RocketMQ、Kafka延迟队列实现-腾讯云开发者社区-腾讯云

    延迟消息队列设计-腾讯云开发者社区-腾讯云

    用Kafka实现延迟消息_kafka延迟消费_alvin.yao的博客-CSDN博客

    怎么设计一个合适的延时队列?

    基于kafka实现延迟队列 - 知乎

  • 相关阅读:
    排序算法—
    keep-alive的使用场景跟原理分析(超详细讲解)
    区块链(9):java区块链项目的Web服务实现之实现web服务
    服务器重装思路
    电脑d盘不见了怎么恢复?
    【学生网页作业】航海王动漫网页 html+ css + JavaScript 简单的学生网页作业源码
    elmentUI多级菜单动态显示
    音视频封装demo:将h264数据和aac数据封装(mux)成TS文件(纯手工,不依赖第三方开源库)
    【神经网络】Dropout原理
    ChatGPT/GPT4科研技术应用与AI绘图及论文高效写作
  • 原文地址:https://blog.csdn.net/yangyanping20108/article/details/132630909