• SpringBoot使用RabbitMQ实现延迟队列


    需求和目标

    商城系统,用户下单后若15分钟内仍未完成支付,则自动取消订单,若已支付,不做其他特殊操作
    系统还需要支持即时消息的功能,即发即收。

    名词解释

    ①即时队列:即发即收
    ②延迟队列:发了消息,没有接收方,只有消息过期后才被处理
    死信队列:延迟队列上的消息过期后,会被自动转发到死信队列中,从而最终达到延迟的目的

    实现方式

    本文采用RabbitMQ自身属性:
    TTL(Time To Live存活时间) + DLX(Dead-Letter-Exchange死信交换机)
    实现延迟队列,先将消息发到指定了TTL时长的队列A中,队列A没有消费者,也就是说,队列A中的消息肯定会过期,等消息过期后,就会加入到队列B,也就是死信队列,B队列是有消费者在监听的,一旦收到消息,就进行后续的逻辑处理,从而达到延迟效果。
    这种实现方式只能为队列设置消息延迟的时长,不能为每个消息指定延迟时长,粒度比较粗,请注意使用的业务场景!

    引入依赖

    
    
        org.springframework.boot
        spring-boot-starter-amqp
    
    

    添加配置文件

    分别声明了:即时、延迟、死信的相关信息
    其中,延迟和死信是相互配合形成了延迟队列

    # rabbitMQ配置
    mq:
      rabbit:
        host: 127.0.0.1:5672
        virtualHost: /
        username: testUser
        password: 123456
        normal-exchange: wms_exchange_normal
        normal-queue: wms_queue_normal
        normal-routing-key: wms_routing_key_normal
        delay-exchange: wms_exchange_delay
        delay-queue: wms_queue_delay
        delay-routing-key: wms_routing_key_delay
        dlx-exchange: wms_exchange_dlx
        dlx-queue: wms_queue_dlx
        dlx-routing-key: wms_routing_key_dlx
    

    配置类

    package com.nwd.common.config;
    
    import org.springframework.amqp.core.*;
    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import java.util.HashMap;
    import java.util.Map;
    
    @Configuration
    public class RabbitConfig {
    	// 从配置文件中读取参数
        @Value("${mq.rabbit.host}")
        String HOST;
        @Value("${mq.rabbit.username}")
        String USERNAME;
        @Value("${mq.rabbit.password}")
        String PASSWORD;
    
        @Value("${mq.rabbit.normal-exchange}")
        String NORMAL_EXCHANGE;
        @Value("${mq.rabbit.normal-queue}")
        String NORMAL_QUEUE;
        @Value("${mq.rabbit.normal-routing-key}")
        String NORMAL_ROUTING_KEY;
    
    
        @Value("${mq.rabbit.delay-exchange}")
        String DELAY_EXCHANGE;
        @Value("${mq.rabbit.delay-queue}")
        String DELAY_QUEUE;
        @Value("${mq.rabbit.delay-routing-key}")
        String DELAY_ROUTING_KEY;
    
        @Value("${mq.rabbit.dlx-exchange}")
        String DLX_EXCHANGE;
        @Value("${mq.rabbit.dlx-queue}")
        String DLX_QUEUE;
        @Value("${mq.rabbit.dlx-routing-key}")
        String DLX_ROUTING_KEY;
    
        //创建mq连接
        @Bean(name = "connectionFactory")
        public ConnectionFactory connectionFactory() {
            CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
            connectionFactory.setUsername(USERNAME);
            connectionFactory.setPassword(PASSWORD);
            //connectionFactory.setVirtualHost(virtualHost);
            connectionFactory.setPublisherConfirms(true);
            //该方法配置多个host,在当前连接host down掉的时候会自动去重连后面的host
            connectionFactory.setAddresses(HOST);
            //connectionFactory.setPort(Integer.parseInt(port));
            return connectionFactory;
        }
    	
    	// 即时队列===========================================
        @Bean
        public Queue normalQueue() {
            return new Queue(NORMAL_QUEUE);
        }
    
        @Bean
        public DirectExchange normalDirectExchange(){
            return new DirectExchange(NORMAL_EXCHANGE);
        }
    
        @Bean
        public Binding normalBinding(){
            return BindingBuilder.bind(normalQueue())
                    .to(normalDirectExchange())
                    .with(NORMAL_ROUTING_KEY);
        }
      	// 即时队列===========================================
    
        // 延迟队列===========================================
        @Bean
        public Queue delayQueue(){
            Map map = new HashMap<>();
            //message在该队列queue的存活时间最大为15分钟
            map.put("x-message-ttl", 10000*6*15);
            //x-dead-letter-exchange参数是设置该队列的死信交换器(DLX)
            map.put("x-dead-letter-exchange", DLX_EXCHANGE);
            //x-dead-letter-routing-key参数是给这个DLX指定路由键
            map.put("x-dead-letter-routing-key", DLX_ROUTING_KEY);
            return new Queue(DELAY_QUEUE,true,false,false,map);
        }
    
        @Bean
        public DirectExchange delayDirectExchange(){
            return new DirectExchange(DELAY_EXCHANGE);
        }
    
        @Bean
        public Binding delayBinding(){
            return BindingBuilder.bind(delayQueue())
                    .to(delayDirectExchange())
                    .with(DELAY_ROUTING_KEY);
        }
        // 延迟队列===========================================
    
        // 死信队列===========================================
        @Bean
        public Queue dlxQueue() {
            return new Queue(DLX_QUEUE);
        }
    
        @Bean
        public DirectExchange dlxDirectExchange(){
            return new DirectExchange(DLX_EXCHANGE);
        }
    
        @Bean
        public Binding dlxBinding(){
            return BindingBuilder.bind(dlxQueue())
                    .to(dlxDirectExchange())
                    .with(DLX_ROUTING_KEY);
        }
        // 死信队列===========================================
    }
    
    

    死信队列消费者

    package com.nwd.module.mq;
    
    import com.rabbitmq.client.Channel;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    /**
     * 死信队列消息处理
     * 此队列消费到的,是经过延迟之后的消息
     * @author niuwenda
     * @since 2024-06-03  09:50
     */
    @Slf4j
    @Component
    @RabbitListener(queues = "${mq.rabbit.dlx-queue}")
    public class DlxMsgConsumer {
        @RabbitHandler(isDefault = true)
        public void process(String msg, Message message, Channel channel) {
            try {
                // 处理消息的业务逻辑
                log.info("RabbitMq:死信队列接收到消息,{}",msg);
                // 此处应判断订单是否已完成支付,若未完成,后续继续编写取消订单逻辑
                // .....
            } catch (Exception e) {
                // 发生异常时,打印日志并拒绝消息(不重新放入队列)
                System.out.println("Error processing message: " + e.getMessage());
                /*try {
                    channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
                } catch (Exception ex) {
                    // 处理拒绝消息的异常
                }*/
            }
        }
    }
    
    

    即时队列消费者

    保证系统有即发即收的功能,此处代码与订单需求无关

    package com.nwd.module.mq;
    
    import com.alibaba.fastjson.JSONObject;
    import com.rabbitmq.client.Channel;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    /**
     * mq消息接收处理器
     * @author niuwenda
     * @since 2024-06-03  09:50
     */
    @Slf4j
    @Component
    @RabbitListener(queues = "${mq.rabbit.normal-queue}")
    public class MqMsgConsumer {
        @RabbitHandler(isDefault = true)
        public void process(String msg, Message message, Channel channel) {
            try {
                // 处理消息的业务逻辑
                log.info("RabbitMq1:接收到消息,{}",msg);
                JSONObject msgObj = JSONObject.parseObject(msg);
                // 手动确认消息
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            } catch (Exception e) {
                // 发生异常时,打印日志并拒绝消息(不重新放入队列)
                System.out.println("Error processing message: " + e.getMessage());
                /*try {
                    channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
                } catch (Exception ex) {
                    // 处理拒绝消息的异常
                }*/
            }
        }
    }
    

    延迟消息发送

    可以写在controller中,测试时,用接口调用来发送消息

    @Resource
    private RabbitTemplate rabbitTemplate;
    
    @Value("${mq.rabbit.delay-exchange}")
    private String exchange;
    
    rabbitTemplate.convertAndSend(exchange, routingKey, param);
    log.info("RabbitMq发送消息成功:{}", param);
    

    结果

    可看到,消息延迟了10秒收到

    2024-06-03 16:09:23.640  INFO  RabbitMqUtil : RabbitMq发送消息成功:helloMQ
    2024-06-03 16:09:33.655  INFO DlxMsgConsumer : RabbitMq:死信队列接收到消息,helloMQ
    

    注意

    延迟消息插件内部会维护一个本地数据库表,同时使用Elang Timers功能实现计时。如果消息的延迟时间设置较长,可能会导致堆积的延迟消息非常多,会带来较大的CPU开销,同时延迟消息的时间会存在误差。

    因此,不建议设置延迟时间过长的延迟消息,如果时间过长,建议使用任务调度。

  • 相关阅读:
    2023年Q3线上生鲜水产数据分析:市场不景气,销额同比下滑44%
    详解WMS——定义与功能
    Element实现行合并
    Go语言函数底层实现
    IP-Guard管控应用程序运行有哪几种方式?
    SSRF服务器端请求伪造
    [附源码]Python计算机毕业设计Django绿色生活交流社区网站
    正则表达式基础语法
    Word自定义模板无法在新建时使用--解决方法
    每日一博 - 浅析事务隔离级别& MVCC机制
  • 原文地址:https://blog.csdn.net/Funky_oaNiu/article/details/139417086