• SpringCloud Stream消息驱动代码实战


    工程中新建三个子模块

    在7001端口使用eureka注册服务中心(自行准备)

    cloud-stream-rabbitmq-provider8801, 作为生产者进行发消息模块

    cloud-stream-rabbitmq-consumer8802,作为消息接收模块

    cloud-stream-rabbitmq-consumer8803  作为消息接收模块

    消息驱动之生产者

    新建Module:cloud-stream-rabbitmq-provider8801

    pom文件

    版本已交给父工程控制

    1. <properties>
    2. <project.build.sourceEncoding>UTF-8project.build.sourceEncoding>
    3. <maven.compiler.source>1.8maven.compiler.source>
    4. <maven.compiler.target>1.8maven.compiler.target>
    5. <junit.version>4.12junit.version>
    6. <log4j.version>1.2.17log4j.version>
    7. <lombok.version>1.16.18lombok.version>
    8. <mysql.version>8.0.16mysql.version>
    9. <druid.version>1.1.16druid.version>
    10. <mybatis.spring.boot.version>1.3.0mybatis.spring.boot.version>
    11. properties>
    1. "1.0" encoding="UTF-8"?>
    2. <project xmlns="http://maven.apache.org/POM/4.0.0"
    3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    5. <parent>
    6. <artifactId>mscloudartifactId>
    7. <groupId>com.atguigu.springcloudgroupId>
    8. <version>1.0-SNAPSHOTversion>
    9. parent>
    10. <modelVersion>4.0.0modelVersion>
    11. <artifactId>cloud-stream-rabbitmq-provider8801artifactId>
    12. <dependencies>
    13. <dependency>
    14. <groupId>org.springframework.bootgroupId>
    15. <artifactId>spring-boot-starter-webartifactId>
    16. dependency>
    17. <dependency>
    18. <groupId>org.springframework.bootgroupId>
    19. <artifactId>spring-boot-starter-actuatorartifactId>
    20. dependency>
    21. <dependency>
    22. <groupId>org.springframework.cloudgroupId>
    23. <artifactId>spring-cloud-starter-netflix-eureka-clientartifactId>
    24. dependency>
    25. <dependency>
    26. <groupId>org.springframework.cloudgroupId>
    27. <artifactId>spring-cloud-starter-stream-rabbitartifactId>
    28. dependency>
    29. <dependency>
    30. <groupId>org.springframework.bootgroupId>
    31. <artifactId>spring-boot-devtoolsartifactId>
    32. <scope>runtimescope>
    33. <optional>trueoptional>
    34. dependency>
    35. <dependency>
    36. <groupId>org.projectlombokgroupId>
    37. <artifactId>lombokartifactId>
    38. <optional>trueoptional>
    39. dependency>
    40. <dependency>
    41. <groupId>org.springframework.bootgroupId>
    42. <artifactId>spring-boot-starter-testartifactId>
    43. <scope>testscope>
    44. dependency>
    45. dependencies>
    46. project>

    yml文件 

    1. server:
    2. port: 8801
    3. spring:
    4. application:
    5. name: cloud-stream-provider
    6. cloud:
    7. stream:
    8. binders: # 在此处配置要绑定的rabbitmq的服务信息;
    9. defaultRabbit: # 表示定义的名称,用于于binding整合
    10. type: rabbit # 消息组件类型
    11. environment: # 设置rabbitmq的相关的环境配置
    12. spring:
    13. rabbitmq:
    14. host: localhost
    15. port: 5672
    16. username: guest
    17. password: guest
    18. bindings: # 服务的整合处理
    19. output: # 这个名字是一个通道的名称
    20. destination: studyExchange # 表示要使用的Exchange名称定义
    21. content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
    22. binder: defaultRabbit # 设置要绑定的消息服务的具体设置
    23. eureka:
    24. client: # 客户端进行Eureka注册的配置
    25. service-url:
    26. defaultZone: http://localhost:7001/eureka
    27. instance:
    28. lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
    29. lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
    30. instance-id: send-8801.com # 在信息列表时显示主机名称
    31. prefer-ip-address: true # 访问的路径变为IP地址

      cloud:
          stream:
            binders: # 在此处配置要绑定的rabbitmq的服务信息;
              defaultRabbit: # 表示定义的名称,用于于binding整合
                type: rabbit # 消息组件类型
                environment: # 设置rabbitmq的相关的环境配置
                  spring:
                    rabbitmq:
                      host: localhost
                      port: 5672
                      username: guest
                      password: guest
            bindings: # 服务的整合处理
              output: # 这个名字是一个通道的名称
                destination: studyExchange # 表示要使用的Exchange名称定义
                content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
                binder: defaultRabbit # 设置要绑定的消息服务的具体设置 

     主启动类

    1. @SpringBootApplication
    2. public class StreamMQMain8801
    3. {
    4. public static void main(String[] args)
    5. {
    6. SpringApplication.run(StreamMQMain8801.class,args);
    7. }
    8. }

    业务类 

    发送消息接口

    1. public interface IMessageProvider
    2. {
    3. public String send() ;
    4. }

    发送消息接口实现类

    1. @EnableBinding(Source.class) // 可以理解为是一个消息的发送管道的定义
    2. public class MessageProviderImpl implements IMessageProvider
    3. {
    4. @Resource
    5. private MessageChannel output; // 消息的发送管道
    6. @Override
    7. public String send()
    8. {
    9. String serial = UUID.randomUUID().toString();
    10. this.output.send(MessageBuilder.withPayload(serial).build()); // 创建并发送消息
    11. System.out.println("***serial: "+serial);
    12. return serial;
    13. }
    14. }

    Controller

    1. @RestController
    2. public class SendMessageController
    3. {
    4. @Resource
    5. private IMessageProvider messageProvider;
    6. @GetMapping(value = "/sendMessage")
    7. public String sendMessage()
    8. {
    9. return messageProvider.send();
    10. }
    11. }

    测试1:启动7001 8801服务

    查看rabbitMQ界面查看 

    访问

    http://localhost:8801/sendMessage 

    且可以在图形化界面中可以看到波峰起伏 

     消息驱动之消费者

    新建Module:cloud-stream-rabbitmq-consumer8802

    pom文件

    1. "1.0" encoding="UTF-8"?>
    2. <project xmlns="http://maven.apache.org/POM/4.0.0"
    3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    5. <parent>
    6. <artifactId>mscloudartifactId>
    7. <groupId>com.atguigu.springcloudgroupId>
    8. <version>1.0-SNAPSHOTversion>
    9. parent>
    10. <modelVersion>4.0.0modelVersion>
    11. <artifactId>cloud-stream-rabbitmq-provider8801artifactId>
    12. <dependencies>
    13. <dependency>
    14. <groupId>org.springframework.bootgroupId>
    15. <artifactId>spring-boot-starter-webartifactId>
    16. dependency>
    17. <dependency>
    18. <groupId>org.springframework.cloudgroupId>
    19. <artifactId>spring-cloud-starter-netflix-eureka-clientartifactId>
    20. dependency>
    21. <dependency>
    22. <groupId>org.springframework.cloudgroupId>
    23. <artifactId>spring-cloud-starter-stream-rabbitartifactId>
    24. dependency>
    25. <dependency>
    26. <groupId>org.springframework.bootgroupId>
    27. <artifactId>spring-boot-starter-actuatorartifactId>
    28. dependency>
    29. <dependency>
    30. <groupId>org.springframework.bootgroupId>
    31. <artifactId>spring-boot-devtoolsartifactId>
    32. <scope>runtimescope>
    33. <optional>trueoptional>
    34. dependency>
    35. <dependency>
    36. <groupId>org.projectlombokgroupId>
    37. <artifactId>lombokartifactId>
    38. <optional>trueoptional>
    39. dependency>
    40. <dependency>
    41. <groupId>org.springframework.bootgroupId>
    42. <artifactId>spring-boot-starter-testartifactId>
    43. <scope>testscope>
    44. dependency>
    45. dependencies>
    46. project>

    yml文件

    1. server:
    2. port: 8802
    3. spring:
    4. application:
    5. name: cloud-stream-consumer
    6. cloud:
    7. stream:
    8. binders: # 在此处配置要绑定的rabbitmq的服务信息;
    9. defaultRabbit: # 表示定义的名称,用于于binding整合
    10. type: rabbit # 消息组件类型
    11. environment: # 设置rabbitmq的相关的环境配置
    12. spring:
    13. rabbitmq:
    14. host: localhost
    15. port: 5672
    16. username: guest
    17. password: guest
    18. bindings: # 服务的整合处理
    19. input: # 这个名字是一个通道的名称
    20. destination: studyExchange # 表示要使用的Exchange名称定义
    21. content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain”
    22. binder: defaultRabbit # 设置要绑定的消息服务的具体设置
    23. eureka:
    24. client: # 客户端进行Eureka注册的配置
    25. service-url:
    26. defaultZone: http://localhost:7001/eureka
    27. instance:
    28. lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
    29. lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
    30. instance-id: receive-8802.com # 在信息列表时显示主机名称
    31. prefer-ip-address: true # 访问的路径变为IP地址

      cloud:
          stream:
            binders: # 在此处配置要绑定的rabbitmq的服务信息;
              defaultRabbit: # 表示定义的名称,用于于binding整合
                type: rabbit # 消息组件类型
                environment: # 设置rabbitmq的相关的环境配置
                  spring:
                    rabbitmq:
                      host: localhost
                      port: 5672
                      username: guest
                      password: guest
            bindings: # 服务的整合处理
              input: # 这个名字是一个通道的名称
                destination: studyExchange # 表示要使用的Exchange名称定义
                content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain”
                binder: defaultRabbit # 设置要绑定的消息服务的具体设置

    主启动类

    1. @SpringBootApplication
    2. public class StreamMQMain8802
    3. {
    4. public static void main(String[] args)
    5. {
    6. SpringApplication.run(StreamMQMain8802.class,args);
    7. }
    8. }

    业务类

    1. @Component
    2. @EnableBinding(Sink.class)
    3. public class ReceiveMessageListener
    4. {
    5. @Value("${server.port}")
    6. private String serverPort;
    7. @StreamListener(Sink.INPUT)
    8. public void input(Message message)
    9. {
    10. System.out.println("消费者1号,------->接收到的消息:" + message.getPayload()+"\t port: "+serverPort);
    11. }
    12. }

     测试2:

    启动8801 8802 7001

    http://localhost:8801/sendMessage

    8801控制台

     8802控制台:

    cloud-stream-rabbitmq-consumer8803

    依照8802,clone出来一份运行8803

    除端口号不一样外,其他的一致,即该端口号为8803

    测试3:

    启动7001 8803 8802 8801 

    访问

    http://localhost:8801/sendMessage

    可以看到由8801发送的消息被8802 8803都会消费

    运行后有两个问题

    有重复消费问题

    消息持久化问题

    重复消费问题

    目前是8802/8803同时都收到了,存在重复消费问题

    如何解决?分组和持久化属性group

    比如在如下场景中,订单系统我们做集群部署,都会从RabbitMQ中获取订单信息,
    那如果一个订单同时被两个服务获取到,那么就会造成数据错误,我们得避免这种情况。
    这时我们就可以使用Stream中的消息分组来解决

    注意在Stream中处于同一个group中的多个消费者是竞争关系,就能够保证消息只会被其中一个应用消费一次。
    不同组是可以全面消费的(重复消费),
    同一组内会发生竞争关系,只有其中一个可以消费。

    操作原理,进行操作

    微服务应用放置于同一个group中,就能够保证消息只会被其中一个应用消费一次。
    不同的组是可以消费的,同一个组内会发生竞争关系,只有其中一个可以消费。

    修改8801的yml文件:

    增加分组group:atguiguA 

     修改8802的yml文件:

     增加分组group:atguiguB

    测试4:

    重复测试3还是一样的结果

    现在的目的:

    8802/8803实现轮询分组,每次只有一个消费者8801模块的发的消息只能被8802或8803其中一个接收到,这样避免了重复消费。

    将8803的yml文件中分组修改成为:

      group: atguiguA

    测试4:

    重复测试3,在这次访问俩次 http://localhost:8801/sendMessage

    8801控制台:

     8802控制台:

    8803控制台:

     可以发现已经实现目的

    持久化 

     通过上述,解决了重复消费问题,再看看持久化

    修改:

    停止8802/8803并去除掉8802的分组group: atguiguA

    8803的分组group: atguiguA没有去掉

    测试6:

    8801先发送4条消息到rabbitmq,即访问http://localhost:8801/sendMessage四次

    8801控制台:

    先启动8802,无分组属性配置,后台没有打出来消息

    8802控制台:

    再启动8803,有分组属性配置,后台打出来了MQ上的消息

  • 相关阅读:
    V4L2框架
    AI生图王者之战!深度体验实测,谁是真正的艺术家?
    安装Pymc3模块包问题记录
    【Vue3】图片未加载成功前占位
    修改aapt和自定义资源ID
    技术学习:Python(08)|操作MySQL
    使DAO具有粘性的四个因素
    移动硬盘有文件但看不见怎么恢复文件
    【MATLAB教程案例17】基于NSGAII多目标优化算法的matlab仿真及应用
    嵌入式Linux应用开发基础知识(三)——Makefile入门
  • 原文地址:https://blog.csdn.net/m0_62436868/article/details/126375950