• 【SpringBoot整合MQ】-----SpringBoot整合RabbitMQ


    本专栏将从基础开始,循序渐进,以实战为线索,逐步深入SpringBoot相关知识相关知识,打造完整的SpringBoot学习步骤,提升工程化编码能力和思维能力,写出高质量代码。希望大家都能够从中有所收获,也请大家多多支持。
    专栏地址:SpringBoot专栏
    本文涉及的代码都已放在gitee上:gitee地址
    如果文章知识点有错误的地方,请指正!大家一起学习,一起进步。
    专栏汇总:专栏汇总


    相应MQ下载链接: 下载链接

    SpringBoot整合RabbitMQ

    ​ RabbitMQ是MQ产品中的目前较为流行的产品之一,它遵从AMQP协议。RabbitMQ的底层实现语言使用的是Erlang,所以安装RabbitMQ需要先安装Erlang。

    Erlang安装

    在这里插入图片描述

    ​ 下载完毕后得到exe安装文件,一键傻瓜式安装,安装完毕需要重启,需要重启,需要重启。

    ​ 安装的过程中可能会出现依赖Windows组件的提示,根据提示下载安装即可,都是自动执行的,如下:

    在这里插入图片描述

    ​ Erlang安装后需要配置环境变量,否则RabbitMQ将无法找到安装的Erlang。需要配置项如下,作用等同JDK配置环境变量的作用。

    • ERLANG_HOME
    • PATH

    安装

    ​ windows版安装包下载地址:https://rabbitmq.com/install-windows.html

    ​ 下载完毕后得到exe安装文件,一键傻瓜式安装,安装完毕后会得到如下文件
    在这里插入图片描述

    启动服务器

    rabbitmq-service.bat start		# 启动服务
    rabbitmq-service.bat stop		# 停止服务
    rabbitmqctl status				# 查看服务状态
    
    • 1
    • 2
    • 3

    ​ 运行sbin目录下的rabbitmq-service.bat命令即可,start参数表示启动,stop参数表示退出,默认对外服务端口5672。

    ​ 注意:启动rabbitmq的过程实际上是开启rabbitmq对应的系统服务,需要管理员权限方可执行。

    ​ 说明:有没有感觉5672的服务端口很熟悉?activemq与rabbitmq有一个端口冲突问题,学习阶段无论操作哪一个?请确保另一个处于关闭状态。

    ​ 说明:不喜欢命令行的小伙伴可以使用任务管理器中的服务页,找到RabbitMQ服务,使用鼠标右键菜单控制服务的启停。

    在这里插入图片描述

    访问web管理服务

    ​ RabbitMQ也提供有web控制台服务,但是此功能是一个插件,需要先启用才可以使用。

    rabbitmq-plugins.bat list							# 查看当前所有插件的运行状态
    rabbitmq-plugins.bat enable rabbitmq_management		# 启动rabbitmq_management插件
    
    • 1
    • 2

    ​ 启动插件后可以在插件运行状态中查看是否运行,运行后通过浏览器即可打开服务后台管理界面

    http://localhost:15672
    
    • 1

    ​ web管理服务默认端口15672,访问后可以打开RabbitMQ的管理界面,如下:

    在这里插入图片描述

    ​ 首先输入访问用户名和密码,初始化用户名和密码相同,均为:guest,成功登录后进入管理后台界面,如下:

    在这里插入图片描述

    整合(direct模型)

    ​ RabbitMQ满足AMQP协议,因此不同的消息模型对应的制作不同,先使用最简单的direct模型开发。

    步骤①:导入springboot整合amqp的starter,amqp协议默认实现为rabbitmq方案

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    
    • 1
    • 2
    • 3
    • 4

    步骤②:配置RabbitMQ的服务器地址

    spring:
      rabbitmq:
        host: localhost
        port: 5672
    
    • 1
    • 2
    • 3
    • 4

    步骤③:初始化直连模式系统设置

    ​ 由于RabbitMQ不同模型要使用不同的交换机,因此需要先初始化RabbitMQ相关的对象,例如队列,交换机等

    @Configuration
    public class RabbitConfigDirect {
        @Bean
        public Queue directQueue(){
            return new Queue("direct_queue");
        }
        @Bean
        public Queue directQueue2(){
            return new Queue("direct_queue2");
        }
        @Bean
        public DirectExchange directExchange(){
            return new DirectExchange("directExchange");
        }
        @Bean
        public Binding bindingDirect(){
            return BindingBuilder.bind(directQueue()).to(directExchange()).with("direct");
        }
        @Bean
        public Binding bindingDirect2(){
            return BindingBuilder.bind(directQueue2()).to(directExchange()).with("direct2");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    ​ 队列Queue与直连交换机DirectExchange创建后,还需要绑定他们之间的关系Binding,这样就可以通过交换机操作对应队列。

    步骤④:使用AmqpTemplate操作RabbitMQ

    @Service
    public class MessageServiceRabbitmqDirectImpl implements MessageService {
        @Autowired
        private AmqpTemplate amqpTemplate;
    
        @Override
        public void sendMessage(String id) {
            System.out.println("待发送短信的订单已纳入处理队列(rabbitmq direct),id:"+id);
            amqpTemplate.convertAndSend("directExchange","direct",id);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    ​ amqp协议中的操作API接口名称看上去和jms规范的操作API接口很相似,但是传递参数差异很大。

    步骤⑤:使用消息监听器在服务器启动后,监听指定位置,当消息出现后,立即消费消息

    @Component
    public class MessageListener {
        @RabbitListener(queues = "direct_queue")
        public void receive(String id){
            System.out.println("已完成短信发送业务(rabbitmq direct),id:"+id);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    ​ 使用注解@RabbitListener定义当前方法监听RabbitMQ中指定名称的消息队列。

    整合(topic模型)

    步骤①:同上

    步骤②:同上

    步骤③:初始化主题模式系统设置

    @Configuration
    public class RabbitConfigTopic {
        @Bean
        public Queue topicQueue(){
            return new Queue("topic_queue");
        }
        @Bean
        public Queue topicQueue2(){
            return new Queue("topic_queue2");
        }
        @Bean
        public TopicExchange topicExchange(){
            return new TopicExchange("topicExchange");
        }
        @Bean
        public Binding bindingTopic(){
            return BindingBuilder.bind(topicQueue()).to(topicExchange()).with("topic.*.id");
        }
        @Bean
        public Binding bindingTopic2(){
            return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("topic.orders.*");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    ​ 主题模式支持routingKey匹配模式,*表示匹配一个单词,#表示匹配任意内容,这样就可以通过主题交换机将消息分发到不同的队列中,详细内容请参看RabbitMQ系列课程。

    匹配键topic.*.*topic.#
    topic.order.idtruetrue
    order.topic.idfalsefalse
    topic.sm.order.idfalsetrue
    topic.sm.idfalsetrue
    topic.id.ordertruetrue
    topic.idfalsetrue
    topic.orderfalsetrue

    步骤④:使用AmqpTemplate操作RabbitMQ

    @Service
    public class MessageServiceRabbitmqTopicImpl implements MessageService {
        @Autowired
        private AmqpTemplate amqpTemplate;
    
        @Override
        public void sendMessage(String id) {
            System.out.println("待发送短信的订单已纳入处理队列(rabbitmq topic),id:"+id);
            amqpTemplate.convertAndSend("topicExchange","topic.orders.id",id);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    ​ 发送消息后,根据当前提供的routingKey与绑定交换机时设定的routingKey进行匹配,规则匹配成功消息才会进入到对应的队列中。

    步骤⑤:使用消息监听器在服务器启动后,监听指定队列

    @Component
    public class MessageListener {
        @RabbitListener(queues = "topic_queue")
        public void receive(String id){
            System.out.println("已完成短信发送业务(rabbitmq topic 1),id:"+id);
        }
        @RabbitListener(queues = "topic_queue2")
        public void receive2(String id){
            System.out.println("已完成短信发送业务(rabbitmq topic 22222222),id:"+id);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    ​ 使用注解@RabbitListener定义当前方法监听RabbitMQ中指定名称的消息队列。

    总结

    1. springboot整合RabbitMQ提供了AmqpTemplate对象作为客户端操作消息队列
    2. 操作ActiveMQ需要配置ActiveMQ服务器地址,默认端口5672
    3. 企业开发时通常使用监听器来处理消息队列中的消息,设置监听器使用注解@RabbitListener
    4. RabbitMQ有5种消息模型,使用的队列相同,但是交换机不同。交换机不同,对应的消息进入的策略也不同
  • 相关阅读:
    Python多线程实战:多线程并行很快,但写文件要加锁
    Java新手小白入门篇 API - IO流
    大数据架构系列:如何理解湖仓一体?
    CSS总结第八天 HTML5 + CSS3提高
    uniapp实战项目 (仿知识星球App) - - tabBar配置
    Linux-System V信号量
    代码随想录刷题记录:DP系列
    设计模式之适配器模式
    [Vue3]简易时间轴组件
    亚马逊API调用方法
  • 原文地址:https://blog.csdn.net/Learning_xzj/article/details/125520649