• RabbitMQ 利用DelayExchange插件实现延迟队列


    源码在文章末尾👇🏻


     

    📌前置学习知识       

    1. 如何安装RabbitMQ(DelayExchange)插件

    2. 消息丢失案例(returnCallback/confirmCallback)


    🕰延迟队列工作场景

    大家在使用某宝的时候应该会遇到这样的场景, 自己选择的商品之后, 点击了提交订单, 但是并没有发起支付, 随后在待支付的页面就会看见刚刚的订单正在倒计时, 并且提示剩余时间 这里就会用到延迟队列, 等待时间倒计时结束后, 用户仍没有支付订单, 就会发送消息取消订单


    ⌛️声明延迟队列, 并监听消息
     

    这里值得注意的是, 在声明参数的时候多了一个 delayed = "true" 代表的意思是具有延迟功能的交换机

    1. @RabbitListener(bindings = @QueueBinding(
    2. value = @Queue(name = "delay.queue", durable = "true"),
    3. //delayed = "true" 标明是具有延迟功能的交换机
    4. exchange = @Exchange(name = "delay.direct", delayed = "true"),
    5. key = "delay"
    6. ))
    7. public void listenDelayExchange(String msg) {
    8. log.info("消费者接收到了delay.queue的延迟消息");
    9. }

     重启我们的消费者后, 在web页面上看到我们刚刚新建的交换机


    ⏱发送延迟消息

    这里需要注意的是, 之前设置延迟时间是通过setExpiration("5000")来设置, 在给延迟交换机发送消息的时候需要改为setHeader("x-delay", 5000)

    1. @Test
    2. public void testSendDelayMessage(){
    3. Message message = MessageBuilder
    4. .withBody("test 发送延迟消息".getBytes(StandardCharsets.UTF_8))
    5. .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
    6. //设置延迟时间五秒
    7. .setHeader("x-delay", 5000)
    8. .build();
    9. //可以用分布式高并发的ID
    10. CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
    11. rabbitTemplate.convertAndSend("delay.direct", "delay", message, correlationData);
    12. log.info("发送消息成功!");
    13. }

    发送消息之后控制台的log日志显示了发送消息成功! 但是紧接着也显示了配置的 ReturnCallBack的错误日志, 那消息到底有没有发成功呢?

     在查看客户端监听消息的日志, 我们发现消息确实已经成功接收到, 那就证明刚刚的消息已经发送成功!

     回到刚刚的问题, ReturnCallBack的错误日志是如何产生的, 这里我们就需要了解一下延迟交换机的原理 : 当消息发送到延迟交换机中, 正常情况下交换机应该立即发送消息给队列, 不具备消息的存储功能, 而延迟交换机会帮你把消息暂存, 等过完5秒钟后才发送, 所以消息没有进行转发才导致刚刚的报错. 

    这样一来, 我们知道了错误的原因就可以开始对症下药, 根据日志中的信息我们可以做出如下判断

    消息发送到队列失败,响应码:312, 失败原因:NO_ROUTE, 交换机: delay.direct, 路由key:delay, 消息: (Body:'[B@376ff483(byte[23])' MessageProperties [headers={spring_returned_message_correlation=7a7451f6-9d7a-4205-8339-7664848c2866}, contentType=application/octet-stream, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, receivedDelay=5000, deliveryTag=0])
     

    根据MessageProperties中的receivedDelay属性去做判断 如果时间长度 > 0 就代表这是一个延迟消息了

     代码如下 : 

    1. @Configuration
    2. @Slf4j
    3. public class CommonConfig implements ApplicationContextAware {
    4. @Override
    5. public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
    6. //获取RabbitTemplate对象
    7. RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
    8. //配置ReturnCallBack
    9. rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
    10. //判断是否是延迟消息
    11. if (message.getMessageProperties().getReceivedDelay() > 0) {
    12. return;
    13. }
    14. // 记录日志
    15. log.error("消息发送到队列失败,响应码:{}, 失败原因:{}, 交换机: {}, 路由key:{}, 消息: {}",
    16. replyCode, replyText, exchange, routingKey, message);
    17. // 如果有需要的话,重发消息
    18. });
    19. }
    20. }

    运行结果, 也不会有刚才的报错日志了 


    链接:https://pan.baidu.com/s/1il41ywFnYM4_q3MU9GN_MQ 
    提取码:heng

  • 相关阅读:
    模板引擎Thymeleaf和监听器
    天宇优配|GDR海外发行热情高 资本市场互联互通提速
    uni-app集成使用SQLite
    百度竞价排名推广对比自然排名哪一个更具优势-华媒舍
    使用GPU搭建支持玛雅(Maya)和Adobe AI,DW,PS的职校云计算机房
    markdown语法转换成html渲染到页面
    多策略协同改进的阿基米德优化算法及其应用(Matlab代码实现)
    计算机组成原理知识总结(三)存储系统
    JWT令牌实现登陆校验
    Python学习笔记--字符、变量、数据类型与注释
  • 原文地址:https://blog.csdn.net/qq_45481709/article/details/126561395