• springboot+redis实现延迟队列(内含Redisson版本)


    一:利用redis的zset实现消息队列

    使用场景
    1、下单成功,30分钟未支付。支付超时,自动取消订单

    2、订单签收,签收后7天未进行评价。订单超时未评价,系统默认好评

    3、下单成功,商家5分钟未接单,订单取消

    4、配送超时,推送短信提醒

    ......

    对于延时比较长的场景、实时性不高的场景,我们可以采用任务调度的方式定时轮询处理。如:xxl-job

    今天我们采用一种比较简单、轻量级的方式,使用 Redis 的延迟队列来进行处理。当然有更好的解决方案,可根据公司的技术选型和业务体系选择最优方案。如:使用消息中间件Kafka、RabbitMQ 的延迟队列

    原理:开启一个守护线程对消息进行轮询消费,利用zset的有序性,ZSet 有一个 Score 属性可以用来存储延迟执行的时间,将到达过期时间的消息去除,不能的实现类处理不同的业务。

    1:延迟消息实体类

    1. import com.yx.constant.IdWorker;
    2. import java.io.Serializable;
    3. /**
    4. * 延迟消息实体类
    5. */
    6. public class DelayMessage implements Serializable {
    7. /**
    8. * 队列id 用原子类生产
    9. */
    10. private String seqId;
    11. /**
    12. * 消息主体
    13. */
    14. private Object body;
    15. /**
    16. * 消息消费的时间戳
    17. */
    18. private long expire;
    19. public DelayMessage(){}
    20. public DelayMessage(Object body, long expire) {
    21. this.seqId = IdWorker.getNextIdStr();
    22. this.body = body;
    23. this.expire = expire;
    24. }
    25. public DelayMessage(Object body, int delay) {
    26. this(body, System.currentTimeMillis() + delay);
    27. }
    28. public String getSeqId() {
    29. return seqId;
    30. }
    31. public Object getBody() {
    32. return body;
    33. }
    34. public long getExpire() {
    35. return expire;
    36. }

    2:延迟队列类

    1. import org.springframework.beans.factory.annotation.Autowired;
    2. import org.springframework.data.redis.core.RedisTemplate;
    3. import org.springframework.data.redis.core.ZSetOperations;
    4. import org.springframework.stereotype.Component;
    5. import java.lang.reflect.Array;
    6. import java.util.Set;
    7. /**
    8. * 延迟队列类
    9. */
    10. @Component
    11. public class DelayQueue {
    12. @Autowired
    13. private RedisTemplate redisTemplate;
    14. /**
    15. * 加入一个消息
    16. * @param queueName 队列名称
    17. * @param message 延迟消息类
    18. */
    19. public void add(String queueName, DelayMessage message) {
    20. redisTemplate.opsForZSet().add(queueName, message, message.getExpire());
    21. synchronized (this) {
    22. //加入一个消息后唤醒所有睡眠的线程
    23. this.notifyAll();
    24. }
    25. }
    26. /**
    27. * 取出一个消息
    28. * @param queueName 队列名称
    29. * @return
    30. * @throws InterruptedException
    31. */
    32. public DelayMessage take(String queueName) throws InterruptedException {
    33. while (true) {
    34. ZSetOperations.TypedTuple<DelayMessage> tuple = getFirst(queueName);
    35. if (tuple == null) {
    36. synchronized (this) {
    37. //如果该队列为空则休眠该线程
    38. this.wait();
    39. tuple = getFirst(queueName);
    40. }
    41. }
    42. if (tuple != null){
    43. //消息到期时间戳
    44. long expire = tuple.getScore().longValue();
    45. //延迟时间
    46. long delay = expire - System.currentTimeMillis();
    47. //未到达消费时间
    48. if (delay > 0) {
    49. synchronized (this) {
    50. //让线程休眠delay毫秒
    51. this.wait(delay);
    52. }
    53. }
    54. /*
    55. 这里判断消息消费时间是否大于当前时间的原因:
    56. 当该线程执行上一步 this.wait(delay);
    57. 正好有消息消费在休眠准备消费时;
    58. 有消息加入执行了add 加入消息,此时唤醒了休眠的线程
    59. 所有这里再做一次判断,防止未到时间消息就消费了
    60. */
    61. if (System.currentTimeMillis() >= expire) {
    62. //取出消息
    63. DelayMessage delayMessage = tuple.getValue();
    64. //把消息 id取出 建立一个key
    65. String lockKey = queueName + "_" + delayMessage.getSeqId() + "_lock";
    66. //setIfAbsent的作用是:
    67. //如果该key有值1,则返回false
    68. //如果该key没有值为1,则返回ture而且再为key加上值为1
    69. Boolean ok = redisTemplate.opsForValue().setIfAbsent(lockKey, "1");
    70. if (ok) {
    71. //移除redis之前存的消息
    72. redisTemplate.opsForZSet().remove(queueName, delayMessage);
    73. //删除刚刚为了保证原子性的key
    74. redisTemplate.delete(lockKey);
    75. //返回消息主体
    76. return delayMessage;
    77. }
    78. }
    79. }
    80. }
    81. }
    82. /**
    83. * 得到该队列的第一条数据
    84. * @param queueName 队列名称
    85. * @return
    86. */
    87. private ZSetOperations.TypedTuple getFirst(String queueName) {
    88. //获取队列名称为 queueName的所有的消息
    89. Set<ZSetOperations.TypedTuple<DelayMessage>> tuples = redisTemplate.opsForZSet().rangeWithScores(queueName, 0, -1);
    90. if (tuples.size() == 0) {
    91. return null;
    92. }
    93. ZSetOperations.TypedTuple[] array = (ZSetOperations.TypedTuple[]) Array.newInstance(ZSetOperations.TypedTuple.class, tuples.size());
    94. return tuples.toArray(array)[0];
    95. }

    3:延迟消息消费类 开启一个线程进行消息的消费

    1. import org.springframework.beans.factory.annotation.Autowired;
    2. import org.springframework.boot.ApplicationArguments;
    3. import org.springframework.boot.ApplicationRunner;
    4. /**
    5. * 延迟消息消费类 开启一个线程进行消息的消费
    6. */
    7. public abstract class DelayQueueConsumer implements ApplicationRunner {
    8. @Autowired
    9. private DelayQueue queue;
    10. public abstract String getQueueName();
    11. public abstract void consume(DelayMessage message);
    12. public abstract void catchInterruptedException(InterruptedException e);
    13. @Override
    14. public void run(ApplicationArguments args) throws Exception {
    15. //获取队列名称
    16. String queueName = getQueueName();
    17. if (queueName == null) {
    18. throw new NullPointerException("the return value of getQueueName method cannot be null");
    19. }
    20. Thread thread = new Thread(()->{
    21. while (true) {
    22. try {
    23. //获取消息
    24. DelayMessage message = queue.take(queueName);
    25. //消费消息
    26. this.consume(message);
    27. } catch (Exception e) {
    28. e.printStackTrace();
    29. }
    30. }
    31. });
    32. thread.setDaemon(true);
    33. thread.start();
    34. }

    4:示例

    1. @Autowired
    2. private DelayQueue queue;
    3. @GetMapping("add")
    4. public String addQueue(@RequestParam Map<String, Object> params){
    5. long millis = System.currentTimeMillis() + 30000;
    6. DelayMessage delayMessage = new DelayMessage(params, millis);
    7. queue.add("remind",delayMessage);
    8. SimpleDateFormat format = new SimpleDateFormat("hh:mm:ss");
    9. String time = format.format(new Date(millis));
    10. return time;
    11. }
    1. import java.text.SimpleDateFormat;
    2. import java.util.Map;
    3. /**
    4. * @ClassName MessageTask
    5. * @Author yangxi
    6. * @Date 2019/9/8 10:42
    7. * @Description desc
    8. **/
    9. @Component
    10. public class MessageTask extends DelayQueueConsumer {
    11. @Override
    12. public String getQueueName() {
    13. return "remind";
    14. }
    15. @Override
    16. public void consume(DelayMessage message) {
    17. Map body = (Map) message.getBody();
    18. String name = body.get("name").toString();
    19. String age = body.get("age").toString();
    20. SimpleDateFormat format = new SimpleDateFormat("hh:mm:ss");
    21. String newTime = format.format(System.currentTimeMillis());
    22. System.out.println("当前时间"+ newTime + age + "||| -.- |||" + name);
    23. }
    24. @Override
    25. public void catchInterruptedException(InterruptedException e) {
    26. }

    二:利用redisson的RBlockingQueue实现消息队列

    1:基于redisson实现的延时消息管理器

    1. import cn.hutool.core.thread.ThreadUtil;
    2. import cn.hutool.extra.spring.SpringUtil;
    3. import com.yx.handler.DelayMessageHandler;
    4. import com.yx.message.DelayMessage;
    5. import com.yx.message.DelayMessageType;
    6. import lombok.extern.slf4j.Slf4j;
    7. import org.redisson.Redisson;
    8. import org.redisson.api.RBlockingQueue;
    9. import org.redisson.api.RDelayedQueue;
    10. import org.redisson.api.RedissonClient;
    11. import org.springframework.stereotype.Component;
    12. import java.util.Arrays;
    13. import java.util.Map;
    14. import java.util.concurrent.ConcurrentHashMap;
    15. /**
    16. * @description 基于redisson实现的延时消息管理器
    17. */
    18. @Component
    19. @Slf4j
    20. public class RedissonDelayMessageManager implements DelayMessageManager {
    21. private RedissonClient redissonClient = Redisson.create();
    22. private RBlockingQueue<DelayMessage> rBlockingQueue = redissonClient.getBlockingDeque("redisson-delay-message-queue");
    23. private RDelayedQueue<DelayMessage> rDelayedQueue = redissonClient.getDelayedQueue(rBlockingQueue);
    24. private final Map<DelayMessageType, DelayMessageHandler> handlerMap = new ConcurrentHashMap<>(16);
    25. @Override
    26. public void add(DelayMessage message) {
    27. if (rDelayedQueue.contains(message)) {
    28. return;
    29. }
    30. log.info("redisson-delay-message-queue,add message = {}", message);
    31. rDelayedQueue.offer(message, message.getProperties().getExpire(), message.getProperties().getTimeUnit());
    32. }
    33. @Override
    34. public boolean remove(DelayMessage message) {
    35. return rDelayedQueue.remove(message);
    36. }
    37. @Override
    38. public void destroy() {
    39. rDelayedQueue.destroy();
    40. }
    41. @Override
    42. public void afterPropertiesSet() {
    43. Arrays.stream(DelayMessageType.values()).forEach(delayMessageType -> handlerMap.put(delayMessageType, SpringUtil.getBean(delayMessageType.getHandler())));
    44. Thread thread = new Thread(() -> {
    45. while (true) {
    46. try {
    47. DelayMessage delayMessage = rBlockingQueue.take();
    48. log.info("redisson-delay-message-queue,consume message = {}", delayMessage);
    49. handlerMap.get(delayMessage.getType()).handle(delayMessage);
    50. } catch (InterruptedException e) {
    51. e.printStackTrace();
    52. ThreadUtil.safeSleep(1000);
    53. afterPropertiesSet();
    54. }
    55. }
    56. });
    57. thread.setDaemon(true);
    58. thread.start();
    59. }

    2:延时消息处理器接口

    1. /**
    2. * @description 延时消息处理器接口
    3. */
    4. public interface DelayMessageHandler {
    5. /**
    6. * 处理消息
    7. *
    8. * @param message 消息
    9. */
    10. void handle(DelayMessage message);
    11. }

    3:示例

    1. @Resource(type = RedissonDelayMessageManager.class)
    2. private RedissonDelayMessageManager delayMessageManager;
    3. @GetMapping("add")
    4. private String add() {
    5. DelayMessage delayMessage1 = new DelayMessage();
    6. QrCode qrCode1 = new QrCode();
    7. qrCode1.setConfigId("123");
    8. qrCode1.setUrl("http://www.baidu.com");
    9. delayMessage1.setBody(JSONUtil.toJsonStr(qrCode1));
    10. delayMessage1.setType(DelayMessageType.DELETE_QR_CODE);
    11. DelayMessage.DelayMessageProperties properties1 = new DelayMessage.DelayMessageProperties();
    12. properties1.setExpire(1239);
    13. properties1.setTimeUnit(TimeUnit.MILLISECONDS);
    14. delayMessage1.setProperties(properties1);
    15. delayMessageManager.add(delayMessage1);
    16. return "success";
    17. }
    1. /**
    2. * @description 任务消息处理器
    3. */
    4. @Component
    5. @Slf4j
    6. public class ExecuteTaskDelayMessageHandler implements DelayMessageHandler {
    7. @Override
    8. public void handle(DelayMessage message) {
    9. log.info("任务延时消息处理中,message={}", message);
    10. System.out.println(DateUtil.now() + "" + JSONUtil.toJsonStr(message));
    11. }
    12. }

    三:项目demo地址

     delay-message-redis-demo

  • 相关阅读:
    辅助驾驶功能开发-功能规范篇(16)-2-领航辅助系统NAP-功能ODD定义
    IaC实战指南:DevOps的自动化基石
    C 语言的标识符,保留标识符,关键字
    一个纯Python构建的Web应用框架
    【JVM】类的生命周期
    记一次 HTTPS 抓包分析和 SNI 的思考
    硬件新问答
    js异步编程-题目29 实现异步任务执行器 AsyncWorker
    2023年内网穿透常用的几个工具
    Opencv图像边缘检测——Roberts算子(手写)、Sobel算子(手写和调包)、Scharr算子、Laplacian算子
  • 原文地址:https://blog.csdn.net/qq_35493807/article/details/125618796