• RabbitMQ学习总结-延迟消息


    1.死信交换机

    一致不被消费的信息/过期的信息/被标记nack/reject的信息,这些消息都可以进入死信交换机,但是首先要配置的有私信交换机。私信交换机可以再RabbitMQ的客户端上选定配置-dead-letter-exchange。

    2.延迟消息

    像我们买车票,外卖订单,如果下了单没付款,都会显示一个30分钟后未支付将进行取消订单,这种都算是延迟消息,设置了过期时间,一直不消费,到期后就会进入死信队列。

    MQ提供了延迟消息的组件:

    Scheduling Messages with RabbitMQ | RabbitMQGitHub - rabbitmq/rabbitmq-delayed-message-exchange: Delayed Messaging for RabbitMQ

    安装组件后,在使用时,基于注解要声明延迟交换机

    1. @RabbitListener(bindings = @QueueBinding(
    2. value = @Queue(name = "delay.queue", durable = "true"),
    3. exchange = @Exchange(name = "delay.direct", delayed = "true"),
    4. key = "delay"
    5. ))
    6. public void listenDelayMessage(String msg){
    7. log.info("接收到delay.queue的延迟消息:{}", msg);
    8. }

    发送延迟消息:设置延长时间

    1. @Test
    2. void testPublisherDelayMessage() {
    3. // 1.创建消息
    4. String message = "hello, delayed message";
    5. // 2.发送消息,利用消息后置处理器添加消息头
    6. rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() {
    7. @Override
    8. public Message postProcessMessage(Message message) throws AmqpException {
    9. // 添加延迟消息属性
    10. message.getMessageProperties().setDelay(5000);
    11. return message;
    12. }
    13. });
    14. }

    3.对于订单的延迟消息,应该主动去查询是否已经支付成功,可以定时去发送消息,查询状态,确认是否已经状态改变。

    消息监听侧,如果初始没有支付,可以根据上图不同的节点去再次发起消息查询:

    1. @Slf4j
    2. @Component
    3. @RequiredArgsConstructor
    4. public class OrderStatusListener {
    5. private final IOrderService orderService;
    6. private final PayClient payClient;
    7. private final RabbitTemplate rabbitTemplate;
    8. @RabbitListener(bindings = @QueueBinding(
    9. value = @Queue(name = MqConstants.DELAY_ORDER_QUEUE, durable = "true"),
    10. exchange = @Exchange(name = MqConstants.DELAY_EXCHANGE, type = ExchangeTypes.TOPIC),
    11. key = MqConstants.DELAY_ORDER_ROUTING_KEY
    12. ))
    13. public void listenOrderCheckDelayMessage(MultiDelayMessage msg) {
    14. // 1.获取消息中的订单id
    15. Long orderId = msg.getData();
    16. // 2.查询订单,判断状态:1是未支付,大于1则是已支付或已关闭
    17. Order order = orderService.getById(orderId);
    18. if (order == null || order.getStatus() > 1) {
    19. // 订单不存在或交易已经结束,放弃处理
    20. return;
    21. }
    22. // 3.可能是未支付,查询支付服务
    23. PayOrderDTO payOrder = payClient.queryPayOrderByBizOrderNo(orderId);
    24. if (payOrder != null && payOrder.getStatus() == 3) {
    25. // 支付成功,更新订单状态
    26. orderService.markOrderPaySuccess(orderId);
    27. return;
    28. }
    29. // 4.确定未支付,判断是否还有剩余延迟时间
    30. if (msg.hasNextDelay()) {
    31. // 4.1.有延迟时间,需要重发延迟消息,先获取延迟时间的int值
    32. int delayVal = msg.removeNextDelay().intValue();
    33. // 4.2.发送延迟消息
    34. rabbitTemplate.convertAndSend(MqConstants.DELAY_EXCHANGE, MqConstants.DELAY_ORDER_ROUTING_KEY, msg,
    35. message -> {
    36. message.getMessageProperties().setDelay(delayVal);
    37. return message;
    38. });
    39. return;
    40. }
    41. // 5.没有剩余延迟时间了,说明订单超时未支付,需要取消订单
    42. orderService.cancelOrder(orderId);
    43. }
    44. }

  • 相关阅读:
    【React Hooks原理 - useRef】
    InnoDB学习之死锁
    《Cesium 进阶知识点》- 计算多个 ImageryLayer 的最大包围盒
    基于Or-Tools的整数规划问题求解
    class09:ejs模块
    德人合科技 | —数据泄露可能会对公司造成哪些影响?
    独立站运营和facebook投放怎么做
    python 微信小程序 英语单词小程序代码分享
    工业智能网关BL110应用之七十九: 实现西门子S7-400 PLC 接入阿里云平台
    SparkSQL - 介绍及使用 Scala、Java、Python 三种语言演示
  • 原文地址:https://blog.csdn.net/qq_28339335/article/details/136742932