• 【Lilishop商城】No2-5.确定软件架构搭建四(本篇包括消息中间件RocketMQ)


     仅涉及后端,全部目录看顶部专栏,代码、文档、接口路径在:

    【Lilishop商城】记录一下B2B2C商城系统学习笔记~_清晨敲代码的博客-CSDN博客


    全篇只介绍重点架构逻辑,具体编写看源代码就行,读起来也不复杂~

    谨慎:源代码中有一些注释是错误的,有的注释意思完全相反,有的注释对不上号,我在阅读过程中就顺手更新了,并且在我不会的地方添加了新的注释,所以在读源代码过程中一定要谨慎啊!

    目录

    A1.消息中间件AMQP

    B1.消息中间件基本搭建 

    C1.测试生产消息业务

    B2.consumer 消息模块

    C1.测试消费消息业务

    剩余内容:定时任务等


    A1.消息中间件AMQP

    RocketMQ学习可以看这篇文章,特别详细:​​​​​​​RocketMQ-介绍_CSDN博客_rocketmq

    有核心四大组件 NameServer(名称服务器)、Broker(费服务器)、 Producer、Consumer;

    有几个概念:消息(Message)、主题(Topic)、标签(Tag)、队列(Queue)、生产者组(producerGroup)、消费者组(consumerGroup);

    可以理解为:

    我们想要处理的数据就是消息,每个消息属于且只能属于一个主题,但是可以属于多个标签,一个主题中可以包含多个队列,队列里面存储消息。

    在使用中,我们可以把同一类型的消息放到一个主题里面,如果这类消息有不同的操作,就可以在区分不同标签,然后分别处理。

    生产者组(producerGroup)、消费者组(consumerGroup)暂时用不到,但是要了解一下,生产者组就是一类生产者的集合。消费者组要复杂些,就是一类消费者的集合,这类消费者通常消费同一类消息,并且消费逻辑一致,所以将这些消费者分组在一起,消费者组与生产者组类似,都是将相同角色的消费者分组在一起并命名的。

    组的概念容易搞不清楚,我也没找到实例,就是自己简单测试了一下,之后再总结吧,组的概念看这里:RocketMq中组的概念_rocketmq生产者组作用

    还有一件事~消费者组有订阅逻辑,可以看阿里云的介绍,什么是订阅关系一致_消息队列RocketMQ版-阿里云帮助中心

    在开始使用前为了方便查看消息队列,我们下载一个可视化控制台rocketmq-externals,可看这篇:Mac安装RocketMQ和可视化控制台教程_-CSDN博客-可视化教程在第二大步骤

    rocketMq在4.4.0版本时引入了ACL机制,用于访问权限控制。但是我还没找到rocketmq-spring-boot-starter对应的操作版本,所以整合的jar包是否有权限访问这个需要再次确定(待学习)

    B1.消息中间件基本搭建 

    系统使用的rocketmq-spring-boot-starter,经过了封装使用起来更方便。

    在 framework 项目路径下的 pom.xml 中指定使用的依赖包,并且在总项目路径中设置包的版本号

    1. /lilishop-master/pom.xml
    2. <properties>
    3. <rocketmq-version>2.2.2rocketmq-version>
    4. properties>
    1. /lilishop-master/framework/pom.xml
    2. <dependency>
    3. <groupId>org.apache.rocketmqgroupId>
    4. <artifactId>rocketmq-spring-boot-starterartifactId>
    5. <version>${rocketmq-version}version>
    6. <exclusions>
    7. <exclusion>
    8. <artifactId>fastjsonartifactId>
    9. <groupId>com.alibabagroupId>
    10. exclusion>
    11. <exclusion>
    12. <artifactId>slf4j-apiartifactId>
    13. <groupId>org.slf4jgroupId>
    14. exclusion>
    15. exclusions>
    16. dependency>

    然后在业务包里的resource的配置文件application.xml中设置配置信息,配置服务器访问地址和生产者信息,这里注意哦,由于我们使用的是 springboot 整合的,我们可以直接通过yml设置DefaultMQProducer的配置信息,然后通过 ExtProducerResetConfiguration 创建RocketMQTemplate的bean并使用,所以当前程序中使用自动注入的RocketMQTemplate操作生产者时都是一个group里的哦,例如manager-api模块中的。

    RocketMQTemplate里面有DefaultMQProducer属性)

    1. # /lilishop-master/manager-api/src/main/resources/application.yml
    2. rocketmq:
    3. # 访问地址
    4. name-server: 127.0.0.1:9876
    5. producer:
    6. # 必须指定 group
    7. group: lili_group
    8. send-message-timeout: 30000

    现在就可以进行基本的使用了,我们要进行一下测试的。

    C1.测试生产消息业务

    我们可以写个controller类,里面注入RocketMQTemplate对象,然后生产消息.

    也可以在test里面进行测试,我这里就在test里测试生产了,一定要配置yml,

    1. # /lilishop-master/manager-api/src/test/resources/application.yml
    2. rocketmq:
    3. # 访问地址
    4. name-server: 127.0.0.1:9876
    5. producer:
    6. # 必须指定 group
    7. group: lili_group
    8. send-message-timeout: 30000

    然后创建测试方法

    1. //详见:cn.lili.test.rocketmq.MsgExtRocketMqTest
    2. @ExtendWith(SpringExtension.class)
    3. @SpringBootTest
    4. @Slf4j
    5. class MsgExtRocketMqTest {
    6. @Autowired
    7. private RocketMQTemplate rocketMQTemplate;
    8. @Test
    9. void test() {
    10. //这里用 :隔开 topic 和 tag,多个 tag可以用 || 隔开
    11. String destination = "topicTest1:tagTest1";
    12. Message message = MessageBuilder.withPayload("Context~~~~~~~"+destination).build();
    13. rocketMQTemplate.send(destination, message);
    14. log.info("生产消息:msg={}", message);
    15. Assertions.assertTrue(true);
    16. }
    17. }

     打开可视化控制台rocketmq-externals,打开消息列表,主题查询一定要选择主题,不然提示:Required String parameter 'topic' is not present。

    下面就是我们生产的消息:

    注意红色字体~ 

    B2.consumer 消息模块

    接下来做消费者的测试,lilishop系统是直接在 consumer 模块做的消息mq和后面的定时/延时任务,那我们也可以跟他们一样创建 consumer 模块,在里面添加消费者监听

    先添加依赖,由于之后的这里也会有很多业务逻辑,所以直接依赖 framework 就可以

    1. /lilishop-master/consumer/pom.xml
    2. <parent>
    3. <groupId>cn.liligroupId>
    4. <artifactId>lili-shop-parentartifactId>
    5. <version>${revision}version>
    6. <relativePath>../pom.xmlrelativePath>
    7. parent>
    8. <dependencies>
    9. <dependency>
    10. <groupId>cn.liligroupId>
    11. <artifactId>frameworkartifactId>
    12. <version>${revision}version>
    13. dependency>
    14. dependencies>

    然后在resource的配置文件application.xml中设置配置信息,一开始直接设置服务端口、日志信息和rocketmq配置就可以​​​​​​​

    1. # /lilishop-master/consumer/src/main/resources/application.yml
    2. server:
    3. port: 8886
    4. servlet:
    5. context-path: /
    6. tomcat:
    7. uri-encoding: UTF-8
    8. threads:
    9. min-spare: 50
    10. max: 1000
    11. # 日志
    12. logging:
    13. config: classpath:logback-spring.xml
    14. # 输出级别
    15. level:
    16. root: info
    17. file:
    18. # 指定路径
    19. path: lili-logs
    20. logback:
    21. rollingpolicy:
    22. # 最大保存天数
    23. max-history: 7
    24. # 每个文件最大大小
    25. max-file-size: 5MB
    26. rocketmq:
    27. # 访问地址
    28. name-server: 127.0.0.1:9876
    29. producer:
    30. # 必须指定 group
    31. group: lili_group
    32. send-message-timeout: 30000

    同时在启动类上修改 rocketmq 的日志配置为Slf4j,默认的日志配置看:org.apache.rocketmq.client.log.ClientLogger

    1. //详见:cn.lili.ConsumerApplication
    2. @SpringBootApplication
    3. public class ConsumerApplication {
    4. public static void main(String[] args) {
    5. System.setProperty("es.set.netty.runtime.available.processors", "false");
    6. //将 rocketmq 的日志修改为 Slf4j ,配合 logback.xml 输出
    7. System.setProperty("rocketmq.client.logUseSlf4j","true");
    8. SpringApplication.run(ConsumerApplication.class, args);
    9. }
    10. }

    要记得关闭security的 http.formLogin().disable();因为我们依赖的是 framework 模块~

    1. //详见:cn.lili.sucurity.ConsumerSecurityConfig
    2. @Component
    3. public class ConsumerSecurityConfig extends WebSecurityConfigurerAdapter {
    4. @Override
    5. protected void configure(HttpSecurity http) throws Exception {
    6. http.formLogin().disable();
    7. }
    8. }

    现在添加消费业务,一个注解和一个监听接口

    1. //详见:cn.lili.listener.TestMessageListener
    2. @Component
    3. @Slf4j
    4. //消费topic = "topicTest1"的消息,consumerGroup是指消费组
    5. @RocketMQMessageListener(topic = "topicTest1", consumerGroup = "consumerGroup")
    6. public class TestMessageListener implements RocketMQListener {
    7. @Override
    8. public void onMessage(MessageExt messageExt) {
    9. log.info("========TestMessageListener,topic = topicTest1 ==================");
    10. byte[] body = messageExt.getBody();
    11. String msg = new String(body);
    12. //区分 tag 再分别进行处理
    13. switch (messageExt.getTags()) {
    14. case "tagTest1":
    15. log.info("监听到消息tagTest1:msg={}", msg);
    16. break;
    17. case "tagTest2":
    18. log.info("监听到消息tagTest2:msg={}", msg);
    19. break;
    20. default:
    21. break;
    22. }
    23. }
    24. }

    C1.测试消费消息业务

    直接启动consumer模块

    我直接通过rocketmq-externals里面重新发的消息~

    剩余内容:定时任务等

  • 相关阅读:
    Spring的循环依赖
    LVS: ambighouse pin count in file “xx“ but none has xx pins问题
    FOC电机控制算法及例程
    总结90条实用的Python编程技巧
    spring下配置tomcat jdbc pool 报找不到“com.mysql.jdbc.Driver“类
    Linux- pipe()系统调用
    Python中Pandas常用函数及案例详解
    LeetCode 3 无重复字符的最长子串
    idea 如何 进行无限续期
    构建高性能物联网数据平台:EMQX和CnosDB的完整教程
  • 原文地址:https://blog.csdn.net/vaevaevae233/article/details/128056105