• Springboot 配置使用 RabbitMQ 并实现延时队列



    前言

    RabbitMQ作用:举几个例子,1、系统解耦,A系统无需关心B系统是否执行成功,无需等待B系统响应,直接把操作扔给mq就可以干其他事情了。2、系统使用高峰期,每秒产生10000条消息需要存储,一次性存入数据库恐怕不太行,所以先把数据发送到 RabbitMQ ,然后设置延时队列,每秒从队列取出1000条存入数据库,这样可以减少数据库压力。3、购买商品下订单以后,发送到延时队列,如果20分钟后没有付款,则从队列删除订单,也就是自动取消订单,如果支付了,则取出存入数据库,下单成功。


    一、安装 RabbitMQ

    Windows安装

    太简单,自己bing一下

    Linux安装

    rabbitmq需要erlang语言环境
    更新 apt 库,安装 erlang 环境,然后执行 erl 查看是否安装成功

    apt update
    apt install erlang
    erl
    
    • 1
    • 2
    • 3

    安装 rabbitmq

    apt install rabbitmq-server
    
    • 1

    查看 rabbitmq 运行状态

    systemctl status rabbitmq-server
    
    • 1

    开启图形化管理界面,然后就可以访问 ip:15672,默认账号密码是 guest

    rabbitmq-plugins enable rabbitmq_management
    
    • 1

    在这里插入图片描述

    默认的guest用户是只能通过本机访问的,所以远程管理后台界面登录需要配置个用户,才能通过外网浏览器访问

    #账号root,密码root
    rabbitmqctl add_user root root
    # 设置为管理员账户
    rabbitmqctl set_user_tags root administrator
    # 分配所有权限
    rabbitmqctl set_permissions -p / root “.*” “.*” “.*”
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    开放防火墙 5672 和 15672 端口

    # Debian/Ubuntu ufw
    ufw allow 5672
    ufw allow 15672
    ufw reload
    # Debian/Ubuntu iptables(这个叼毛防火墙好麻烦,我没用过,不知道是不是这样)
    iptables -A INPUT -p tcp --dport 5672 -j ACCEPT
    iptables -A INPUT -p tcp --dport 15672 -j ACCEPT
    iptables-restore
    # CentOS
    firewall-cmd --zone=public --add-port=5672/tcp --permanent
    firewall-cmd --zone=public --add-port=15672/tcp --permanent
    firewall-cmd --reload
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    二、新建项目

    新建一个 provider 一个 consumer,两个 springboot 项目,都需要引入下面的依赖,或者新建的时候勾选自动添加 rabbitmq 的依赖
    在这里插入图片描述

    1、引入依赖

    <dependency>
        <groupId>org.springframework.amqpgroupId>
        <artifactId>spring-rabbit-testartifactId>
        <scope>testscope>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    2、配置 yml

    provider 和 consumer 都这样配置,端口改成不一样就行了

    server:
      port: 8081
    spring:
      rabbitmq:
        host: 192.168.0.105
        port: 5672
        username: root
        password: root
        virtualHost: /
        # 确认机制
        publisher-confirm-type: correlated
        # 发布确认,如果不配置确认机制,发布确认也不用配置
        publisher-returns: true
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    3、启动类开启 Rabbitmq 注解

    consumer 和 provider 都需要这个注解

    在这里插入图片描述

    4、配置 provider 的 RabbitmqConfig

    大家可以根据 15672 那个图形化管理界面看看下面的一些概念

    • Broker:它提供一种传输服务,它的角色就是维护一条从生产者到消费者的路线,保证数据能按照指定的方式进行传输
    • Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
    • Queue:消息的载体,每个消息都会被投到一个或多个队列。
    • Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来.
    • Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
    • vhost:虚拟主机,一个broker里可以有多个vhost,用作不同用户的权限分离。
    • Producer:消息生产者,就是投递消息的程序.
    • Consumer:消息消费者,就是接受消息的程序.
    • Channel:消息通道,在客户端的每个连接里,可建立多个channel.
    package icu.xuyijie.provider.config;
    
    import org.springframework.amqp.core.*;
    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.amqp.support.converter.MessageConverter;
    import org.springframework.beans.factory.config.ConfigurableBeanFactory;
    import org.springframework.boot.SpringBootConfiguration;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Scope;
    import org.springframework.web.filter.CharacterEncodingFilter;
    
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * @author 徐一杰
     * @date 2022/9/30 9:38
     * @description
     */
    @SpringBootConfiguration
    public class RabbitmqConfig {
    
        public static final String QUEUE_MESSAGE = "queue_message";
        public static final String QUEUE_ORDER = "queue_order";
        public static final String EXCHANGE_A = "exchange_A";
        /**
         * # 是通配符,可以匹配任意,如下面的可以匹配到 aa.bb.message.cc.dd
         * 还有 * 是匹配一个.里面的字符,如 .*.message 只能匹配 .aa.message,不能匹配 .aa.bb.message
         */
        public static final String ROUTING_KEY_MESSAGE = "#.message.#";
        public static final String ROUTING_KEY_ORDER = "#.order.#";
    
        /**
         * 这个可以不写,写了的话,以这个为准,会覆盖掉 yml 的配置
         * @return
         */
        @Bean
        public ConnectionFactory connectionFactory() {
            CachingConnectionFactory connectionFactory = new CachingConnectionFactory("192.168.0.105", 5672);
            connectionFactory.setUsername("root");
            connectionFactory.setPassword("root");
            connectionFactory.setVirtualHost("/");
            //确认机制
            connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
            //发布确认,如果不配置确认机制,发布确认也不用配置
            connectionFactory.setPublisherReturns(true);
            return connectionFactory;
        }
    
        /**
         * 使用 yml 连接的话这个可以不写,这个是配合 connectionFactory 的
         * RabbitMQ的使用入口。scope必须是prototype类型
         * @return
         */
        @Bean
        @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
        public RabbitTemplate rabbitTemplate() {
            RabbitTemplate template = new RabbitTemplate(this.connectionFactory());
            template.setMessageConverter(this.jsonMessageConverter());
            template.setMandatory(true);
            return template;
        }
    
        /**
         * 序列化json
         * @return
         */
        @Bean
        public MessageConverter jsonMessageConverter() {
            return new Jackson2JsonMessageConverter();
        }
    
        @Bean
        public CharacterEncodingFilter characterEncodingFilter() {
            CharacterEncodingFilter filter = new CharacterEncodingFilter();
            filter.setEncoding("UTF-8");
            filter.setForceEncoding(true);
            return filter;
        }
    
        /**
         * 声明交换机
         *  针对消费者配置
         *  设置交换机类型
         *  将队列绑定到交换机
         *    FanoutExchange: 将消息分发到所有的绑定队列,无routing key的概念
         *    HeadersExchange:通过添加属性key-value匹配
         *    DirectExchange: 按照routing key分发到指定队列
         *    TopicExchange: 多关键字匹配
         * @return
         */
        @Bean
        public Exchange exchangeA(){
            //durable(true) 持久化,mq重启之后交换机还在
            return ExchangeBuilder.topicExchange(EXCHANGE_A).durable(true).build();
        }
    
        /**
         * 声明QUEUE_MESSAGE队列
         * durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
         * exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
         * autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
         * 一般设置一下队列的持久化就好,其余两个就是默认false
         * @return
         */
        @Bean
        public Queue queueMessage(){
            return new Queue(QUEUE_MESSAGE, true, false, false);
        }
    
        /**
         * 声明QUEUE_ORDER队列
         * @return
         */
        @Bean
        public Queue queueOrder(){
            return new Queue(QUEUE_ORDER);
        }
    
        /**
         * 队列绑定交换机,指定routingKey
         * @return
         */
        @Bean
        public Binding bindingQueueMessage(){
            return BindingBuilder.bind(queueMessage()).to(exchangeA()).with(ROUTING_KEY_MESSAGE).noargs();
        }
    
        /**
         * 队列绑定交换机,指定routingKey
         * @return
         */
        @Bean
        public Binding bindingQueueOrder(){
            return BindingBuilder.bind(queueOrder()).to(exchangeA()).with(ROUTING_KEY_ORDER).noargs();
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142

    5、provider 发送消息

    我们因为配置了确认机制,所以我们配置了回调方法,这里使用构造器注入 rabbitTemplate,如果不配置回调方法,则可以使用 @Autowired 注入,并且类无需实现 RabbitTemplate.ConfirmCallback,sendExchange 方法没有使用回调方法,使用回调方法的话需要像 sendCallback 方法一样多传一个值 correlationId

    package icu.xuyijie.provider.controller;
    
    import icu.xuyijie.provider.config.RabbitmqConfig;
    import org.springframework.amqp.rabbit.connection.CorrelationData;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import java.util.UUID;
    import java.util.concurrent.ExecutionException;
    
    /**
     * @author 徐一杰
     * @date 2022/9/30 10:08
     * @description
     */
    @RestController
    @RequestMapping("/provider")
    public class ProviderController implements RabbitTemplate.ConfirmCallback {
    
    	/**
    	 * 我们因为配置了确认机制,所以我们配置了回调方法,这里使用构造器注入 rabbitTemplate,如果不配置
    	 * 回调方法,则可以使用 @Autowired 注入,并且类无需实现 RabbitTemplate.ConfirmCallback
    	 */
        private final RabbitTemplate rabbitTemplate;
    
        public ProviderController(RabbitTemplate rabbitTemplate) {
            this.rabbitTemplate = rabbitTemplate;
            //设置回调为当前类对象
            this.rabbitTemplate.setConfirmCallback(this);
        }
    
        @RequestMapping("/sendExchange")
        public void sendExchange(){
            //使用rabbitTemplate发送消息
            String message = "这是一条发送到exchangeA的消息";
            /**
             * 参数:
             * 1、交换机名称
             * 2、routingKey
             * 3、消息内容
             */
            rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_A, "a.message", message);
        }
        
        /**
         * 如果使用回调方法,则需要多传一个参数 correlationId
         */
        @RequestMapping("/sendCallback")
        public void sendCallback(){
            String message = "这是一条发送到exchangeA的消息";
            //构建回调id为uuid
            String callBackId = UUID.randomUUID().toString();
            CorrelationData correlationId = new CorrelationData(callBackId);
            //发送消息到消息队列
            rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_A, "a.message", message, correlationId);
            System.out.println("发送回调id: " + callBackId);
        }
    
        /**
         * 消息回调确认方法
         * @param correlationData 请求数据对象
         * @param ack 是否发送成功
         * @param s
         */
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String s) {
            assert correlationData != null;
            System.out.println("这是回调方法打印的:回调id: " + correlationData.getId());
            try {
                System.out.println("这是回调方法打印的:回调message: " + correlationData.getFuture().get());
            } catch (InterruptedException | ExecutionException e) {
                throw new RuntimeException(e);
            }
            if (ack) {
                System.out.println("这是回调方法打印的:消息发送成功");
            } else {
                System.out.println("消息发送失败" + s);
            }
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83

    6、consumer 接收消息

    @RabbitListener就是监听的队列,可以监听多个

    package icu.xuyijie.consumer.consumer;
    
    import com.rabbitmq.client.Channel;
    import icu.xuyijie.consumer.config.RabbitmqConfig;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    /**
     * @author 徐一杰
     * @date 2022/9/30 10:10
     * @description
     */
    @Component
    public class ReceiveHandler {
    
        @RabbitListener(queues = {"queue_message", "queue_order"})
        public void receiveMessage(Message message, Channel channel){
            System.out.println("接收 queue_message 和 queue_order 队列的消息 " + message);
            System.out.println("对应Channel " + channel);
        }
    
        @RabbitListener(queues = {"queue_order"})
        public void receiveOrder(Message message, Channel channel){
            System.out.println("接收 queue_order  队列的消息" + message);
            System.out.println("对应Channel " + channel);
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    7、演示

    我们直接调用 sendCallback 这个接口

    在这里插入图片描述

    consumer 接收到消息

    在这里插入图片描述

    provider 触发回调方法

    在这里插入图片描述


    三、延时队列

    1、在 provider 的 RabbitmqConfig 中增加配置

    增加了一个一个交换机、一个队列、一个路由键的配置,注意 delayQueue() 方法,我的注释有解释

    	public static final String QUEUE_DELAY = "queue_delay";
        public static final String EXCHANGE_DELAY = "exchange_delay";
        public static final String ROUTER_DELAY_KEY = "router_delay_key";
    
        /**
         * 延迟交换机
         *
         * @return
         */
        @Bean
        public DirectExchange delayExchange() {
            return new DirectExchange(EXCHANGE_DELAY);
        }
    
        /**
         * 延迟队列
         * map 的设置意思是接收此队列的延迟消息需要监听 EXCHANGE_RECEIVE 队列,直接监听 QUEUE_DELAY 无法实现延时队列
         *
         * @return
         */
        @Bean
        public Queue delayQueue() {
            Map<String, Object> map = new HashMap<>(16);
            map.put("x-dead-letter-exchange", EXCHANGE_A);
            map.put("x-dead-letter-routing-key", ROUTING_KEY_ORDER);
            return new Queue(QUEUE_DELAY, true, false, false, map);
        }
    
        /**
         * 给延迟队列绑定交换机
         *
         * @return
         */
        @Bean
        public Binding delayBinding() {
            return BindingBuilder.bind(delayQueue()).to(delayExchange()).with(ROUTER_DELAY_KEY);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    2、在 provider 增加一个接口

    发送消息到我们刚刚配置的延时交换机

    	/**
         * 给延迟队列发送消息
         */
        @RequestMapping("/sendDelay")
        public void sendDelay(){
            rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_DELAY, RabbitmqConfig.ROUTER_DELAY_KEY, "这是一条延时队列的消息", message -> {
                message.getMessageProperties().setExpiration("3000");
                return message;
            }, new CorrelationData(UUID.randomUUID().toString()));
            System.out.println("延时队列发送成功");
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    3、consumer 接收延时消息

    注意,上面我们把延时消息发送到了延时队列 EXCHANGE_DELAY,但是我们接收,要监听 RabbitmqConfig中 delayQueue() 方法 配置的 EXCHANGE_A,路由键为 ROUTING_KEY_ORDER,也就是说无需改动 consumer ,consumer 的两个方法都能收到延时消息,因为他们都监听了 ROUTING_KEY_ORDER 对应的队列

    直接调用 sendDelay 方法,2次(因为两个方法都监听 queue_order,所以他们会交替获得消息)

    在这里插入图片描述
    3秒后 consumer 的两个方法都能接收到延时消息

    在这里插入图片描述


    总结

  • 相关阅读:
    【C++】从使用string类到模拟实现string类
    HDLBits-Fsm3onehot
    汽车电子AEC Q101车规认证FDD9507L-F085 P沟道MOS管
    Linux云计算 |【第二阶段】NETWORK-DAY3
    Vuex的搭建与使用
    Plaxis Python 命令流自动化处理、岩土工程渗流问题之有限单元法
    代码无法输出操作值,如何解决?(语言-c语言)
    python_剔除符合要求的数据并生成新的文件
    Cython加密python代码防止反编译
    小程序的使用
  • 原文地址:https://blog.csdn.net/qq_48922459/article/details/127207359