• 【SpringBoot整合MQ】-----SpringBoot整合RocketMQ


    本专栏将从基础开始,循序渐进,以实战为线索,逐步深入SpringBoot相关知识相关知识,打造完整的SpringBoot学习步骤,提升工程化编码能力和思维能力,写出高质量代码。希望大家都能够从中有所收获,也请大家多多支持。
    专栏地址:SpringBoot专栏
    本文涉及的代码都已放在gitee上:gitee地址
    如果文章知识点有错误的地方,请指正!大家一起学习,一起进步。
    专栏汇总:专栏汇总


    相应MQ下载链接: 下载链接

    SpringBoot整合RocketMQ

    ​ RocketMQ由阿里研发,后捐赠给apache基金会,目前是apache基金会顶级项目之一,也是目前市面上的MQ产品中较为流行的产品之一,它遵从AMQP协议。

    安装

    ​ windows版安装包下载地址:https://rocketmq.apache.org/

    ​ 下载完毕后得到zip压缩文件,解压缩即可使用,解压后得到如下文件

    在这里插入图片描述

    ​ RocketMQ安装后需要配置环境变量,具体如下:

    • ROCKETMQ_HOME :安装目录
    • NAMESRV_ADDR (建议): 127.0.0.1:9876

    ​ 关于NAMESRV_ADDR对于初学者来说建议配置此项,也可以通过命令设置对应值,操作略显繁琐,建议配置。系统学习RocketMQ知识后即可灵活控制该项。

    RocketMQ工作模式

    ​ 在RocketMQ中,处理业务的服务器称为broker,生产者与消费者不是直接与broker联系的,而是通过命名服务器进行通信。broker启动后会通知命名服务器自己已经上线,这样命名服务器中就保存有所有的broker信息。当生产者与消费者需要连接broker时,通过命名服务器找到对应的处理业务的broker,因此命名服务器在整套结构中起到一个信息中心的作用。并且broker启动前必须将命名服务器先启动。

    在这里插入图片描述

    启动服务器

    mqnamesrv		# 启动命名服务器
    mqbroker		# 启动broker
    
    • 1
    • 2

    ​ 运行bin目录下的mqnamesrv命令即可启动命名服务器,默认对外服务端口9876。

    ​ 运行bin目录下的mqbroker命令即可启动broker服务器,如果环境变量中没有设置NAMESRV_ADDR则需要在运行mqbroker指令前通过set指令设置NAMESRV_ADDR的值,并且每次开启均需要设置此项。

    测试服务器启动状态

    ​ RocketMQ提供有一套测试服务器功能的测试程序,运行bin目录下的tools命令即可使用。

    tools org.apache.rocketmq.example.quickstart.Producer		# 生产消息
    tools org.apache.rocketmq.example.quickstart.Consumer		# 消费消息
    
    • 1
    • 2

    整合(异步消息)

    步骤①:导入springboot整合RocketMQ的starter,此坐标不由springboot维护版本

    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-spring-boot-starter</artifactId>
        <version>2.2.1</version>
    </dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    步骤②:配置RocketMQ的服务器地址

    rocketmq:
      name-server: localhost:9876
      producer:
        group: group_rocketmq
    
    • 1
    • 2
    • 3
    • 4

    ​ 设置默认的生产者消费者所属组group。

    步骤③:使用RocketMQTemplate操作RocketMQ

    @Service
    public class MessageServiceRocketmqImpl implements MessageService {
        @Autowired
        private RocketMQTemplate rocketMQTemplate;
    
        @Override
        public void sendMessage(String id) {
            System.out.println("待发送短信的订单已纳入处理队列(rocketmq),id:"+id);
            SendCallback callback = new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.println("消息发送成功");
                }
                @Override
                public void onException(Throwable e) {
                    System.out.println("消息发送失败!!!!!");
                }
            };
            rocketMQTemplate.asyncSend("order_id",id,callback);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    ​ 使用asyncSend方法发送异步消息。

    步骤④:使用消息监听器在服务器启动后,监听指定位置,当消息出现后,立即消费消息

    @Component
    @RocketMQMessageListener(topic = "order_id",consumerGroup = "group_rocketmq")
    public class MessageListener implements RocketMQListener<String> {
        @Override
        public void onMessage(String id) {
            System.out.println("已完成短信发送业务(rocketmq),id:"+id);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    ​ RocketMQ的监听器必须按照标准格式开发,实现RocketMQListener接口,泛型为消息类型。

    ​ 使用注解@RocketMQMessageListener定义当前类监听RabbitMQ中指定组、指定名称的消息队列。

    总结

    1. springboot整合RocketMQ使用RocketMQTemplate对象作为客户端操作消息队列
    2. 操作RocketMQ需要配置RocketMQ服务器地址,默认端口9876
    3. 企业开发时通常使用监听器来处理消息队列中的消息,设置监听器使用注解@RocketMQMessageListener
  • 相关阅读:
    网络超时检测-11.9
    基于Java的新闻资讯平台系统设计与实现(源码+lw+部署文档+讲解等)
    宝贝快出生的这三个表现,孕妈尽快去医院待产
    word怎么公式求平均值
    Semantic Kernel入门系列:通过依赖注入管理对象和插件
    安装插件时Vscode XHR Failed 报错ERR_CERT_AUTHORITY_INVALID
    Python文件处理相关操作
    flume采集mysql日志数据发送到kafka
    【K8S专栏】Kubernetes有状态应用管理
    激光雷达发射的PCB布局
  • 原文地址:https://blog.csdn.net/Learning_xzj/article/details/125520717