1. 如何安装RabbitMQ(DelayExchange)插件
2. 消息丢失案例(returnCallback/confirmCallback)
大家在使用某宝的时候应该会遇到这样的场景, 自己选择的商品之后, 点击了提交订单, 但是并没有发起支付, 随后在待支付的页面就会看见刚刚的订单正在倒计时, 并且提示剩余时间 这里就会用到延迟队列, 等待时间倒计时结束后, 用户仍没有支付订单, 就会发送消息取消订单
这里值得注意的是, 在声明参数的时候多了一个 delayed = "true" 代表的意思是具有延迟功能的交换机
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(name = "delay.queue", durable = "true"),
- //delayed = "true" 标明是具有延迟功能的交换机
- exchange = @Exchange(name = "delay.direct", delayed = "true"),
- key = "delay"
- ))
- public void listenDelayExchange(String msg) {
- log.info("消费者接收到了delay.queue的延迟消息");
- }
重启我们的消费者后, 在web页面上看到我们刚刚新建的交换机
这里需要注意的是, 之前设置延迟时间是通过setExpiration("5000")来设置, 在给延迟交换机发送消息的时候需要改为setHeader("x-delay", 5000)
- @Test
- public void testSendDelayMessage(){
- Message message = MessageBuilder
- .withBody("test 发送延迟消息".getBytes(StandardCharsets.UTF_8))
- .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
- //设置延迟时间五秒
- .setHeader("x-delay", 5000)
- .build();
-
- //可以用分布式高并发的ID
- CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
- rabbitTemplate.convertAndSend("delay.direct", "delay", message, correlationData);
-
- log.info("发送消息成功!");
- }
发送消息之后控制台的log日志显示了发送消息成功! 但是紧接着也显示了配置的 ReturnCallBack的错误日志, 那消息到底有没有发成功呢?
在查看客户端监听消息的日志, 我们发现消息确实已经成功接收到, 那就证明刚刚的消息已经发送成功!
回到刚刚的问题, ReturnCallBack的错误日志是如何产生的, 这里我们就需要了解一下延迟交换机的原理 : 当消息发送到延迟交换机中, 正常情况下交换机应该立即发送消息给队列, 不具备消息的存储功能, 而延迟交换机会帮你把消息暂存, 等过完5秒钟后才发送, 所以消息没有进行转发才导致刚刚的报错.
这样一来, 我们知道了错误的原因就可以开始对症下药, 根据日志中的信息我们可以做出如下判断
消息发送到队列失败,响应码:312, 失败原因:NO_ROUTE, 交换机: delay.direct, 路由key:delay, 消息: (Body:'[B@376ff483(byte[23])' MessageProperties [headers={spring_returned_message_correlation=7a7451f6-9d7a-4205-8339-7664848c2866}, contentType=application/octet-stream, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, receivedDelay=5000, deliveryTag=0])
根据MessageProperties中的receivedDelay属性去做判断 如果时间长度 > 0 就代表这是一个延迟消息了
代码如下 :
- @Configuration
- @Slf4j
- public class CommonConfig implements ApplicationContextAware {
-
- @Override
- public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
- //获取RabbitTemplate对象
- RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
- //配置ReturnCallBack
- rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
- //判断是否是延迟消息
- if (message.getMessageProperties().getReceivedDelay() > 0) {
- return;
- }
- // 记录日志
- log.error("消息发送到队列失败,响应码:{}, 失败原因:{}, 交换机: {}, 路由key:{}, 消息: {}",
- replyCode, replyText, exchange, routingKey, message);
- // 如果有需要的话,重发消息
- });
- }
- }
运行结果, 也不会有刚才的报错日志了
链接:https://pan.baidu.com/s/1il41ywFnYM4_q3MU9GN_MQ
提取码:heng