1.死信交换机
一致不被消费的信息/过期的信息/被标记nack/reject的信息,这些消息都可以进入死信交换机,但是首先要配置的有私信交换机。私信交换机可以再RabbitMQ的客户端上选定配置-dead-letter-exchange。
2.延迟消息
像我们买车票,外卖订单,如果下了单没付款,都会显示一个30分钟后未支付将进行取消订单,这种都算是延迟消息,设置了过期时间,一直不消费,到期后就会进入死信队列。
MQ提供了延迟消息的组件:
Scheduling Messages with RabbitMQ | RabbitMQGitHub - rabbitmq/rabbitmq-delayed-message-exchange: Delayed Messaging for RabbitMQ
安装组件后,在使用时,基于注解要声明延迟交换机
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(name = "delay.queue", durable = "true"),
- exchange = @Exchange(name = "delay.direct", delayed = "true"),
- key = "delay"
- ))
- public void listenDelayMessage(String msg){
- log.info("接收到delay.queue的延迟消息:{}", msg);
- }
发送延迟消息:设置延长时间
- @Test
- void testPublisherDelayMessage() {
- // 1.创建消息
- String message = "hello, delayed message";
- // 2.发送消息,利用消息后置处理器添加消息头
- rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() {
- @Override
- public Message postProcessMessage(Message message) throws AmqpException {
- // 添加延迟消息属性
- message.getMessageProperties().setDelay(5000);
- return message;
- }
- });
- }
3.对于订单的延迟消息,应该主动去查询是否已经支付成功,可以定时去发送消息,查询状态,确认是否已经状态改变。

消息监听侧,如果初始没有支付,可以根据上图不同的节点去再次发起消息查询:
- @Slf4j
- @Component
- @RequiredArgsConstructor
- public class OrderStatusListener {
-
- private final IOrderService orderService;
-
- private final PayClient payClient;
-
- private final RabbitTemplate rabbitTemplate;
-
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(name = MqConstants.DELAY_ORDER_QUEUE, durable = "true"),
- exchange = @Exchange(name = MqConstants.DELAY_EXCHANGE, type = ExchangeTypes.TOPIC),
- key = MqConstants.DELAY_ORDER_ROUTING_KEY
- ))
- public void listenOrderCheckDelayMessage(MultiDelayMessage
msg) { - // 1.获取消息中的订单id
- Long orderId = msg.getData();
- // 2.查询订单,判断状态:1是未支付,大于1则是已支付或已关闭
- Order order = orderService.getById(orderId);
- if (order == null || order.getStatus() > 1) {
- // 订单不存在或交易已经结束,放弃处理
- return;
- }
- // 3.可能是未支付,查询支付服务
- PayOrderDTO payOrder = payClient.queryPayOrderByBizOrderNo(orderId);
- if (payOrder != null && payOrder.getStatus() == 3) {
- // 支付成功,更新订单状态
- orderService.markOrderPaySuccess(orderId);
- return;
- }
- // 4.确定未支付,判断是否还有剩余延迟时间
- if (msg.hasNextDelay()) {
- // 4.1.有延迟时间,需要重发延迟消息,先获取延迟时间的int值
- int delayVal = msg.removeNextDelay().intValue();
- // 4.2.发送延迟消息
- rabbitTemplate.convertAndSend(MqConstants.DELAY_EXCHANGE, MqConstants.DELAY_ORDER_ROUTING_KEY, msg,
- message -> {
- message.getMessageProperties().setDelay(delayVal);
- return message;
- });
- return;
- }
- // 5.没有剩余延迟时间了,说明订单超时未支付,需要取消订单
- orderService.cancelOrder(orderId);
- }
- }