• SpringCloud-7.消息驱动(Spring Cloud Stream)


    目录

    一、概念

    1.1 为什么使用Stream?

    1.2 为什么Stream可以屏蔽底层差异?

    1.3 关于Binder

    1.4 Spring Cloud Stream标准流程

    1.5 常用API/注解

    二、案例演示

    2.1 生产者

    2.2 消费者

    2.3 分组消费与持久化

    2.3.1 环境准备

    2.3.2 分组

    2.3.3 持久化


    一、概念

            Spring Cloud Stream 是一个构建消息驱动微服务的框架。应用程序通过 inputs outputs 来与 Spring Cloud Stream中binder对象交互。通过我们配置来进行binding(绑定) ,而 Spring Cloud Stream 的 binder对象负责与消息中间件交互。所以,我们只需要搞清楚如何与 Spring Cloud Stream 交互就可以方便使用消息驱动的方式。

            Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅消费组分区的三个核心概念。目前仅支持RabbitMQ、Kafka

            一句话来说,就是屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型

    1.1 为什么使用Stream?

            比如RabbitMQKafka,这些中间件架构上有差异。差异性导致我们实际项目开发给我们造成了一定的困扰,我们如果用了两个消息队列的其中一种,后面的业务需求我想往另外一种消息队列进行迁移,这时候无疑就是一个灾难性的,一大堆东西都要重新推倒重新做,因为它跟我们的系统耦合了,这时候springcloud Stream给我们提供了一种解耦合的方式。

            Stream中的消息通信方式遵循了发布-订阅模式Topic主题进行广播,在RabbitMQ就是Exchange,在Kakfa中就是Topic

    1.2 为什么Stream可以屏蔽底层差异?

            在没有绑定器这个概念的情况下,我们的SpringBoot应用要直接与消息中间件进行信息交互的时候,由于各消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性。通过定义绑定器作为中间层,完美地实现了应用程序消息中间件细节之间的隔离。通过向应用程序暴露统一的Channel通道,使得应用程序不需要再考虑各种不同的消息中间件实现
            通过定义绑定器Binder作为中间层,实现了应用程序与消息中间件细节之间的隔离

    1.3 关于Binder

            Stream对消息中间件的进一步封装,可以做到代码层面对中间件的无感知,甚至于动态的切换中间件(rabbitmq切换为kafka),使得微服务开发的高度解耦,服务可以关注更多自己的业务流程。

            Binder可以生成BindingBinding用来绑定消息容器的生产者消费者,它有两种类型,INPUTOUTPUTINPUT对应于消费者OUTPUT对应于生产者

    1.4 Spring Cloud Stream标准流程

     

            1.Binder-很方便的连接中间件,屏蔽差异

            2.Channel-是队列Queue的一种抽象,在消息通讯系统中就是实现存储转发媒介,通过Channel对队列进行配置

            3.SourceSink - 简单的可理解为参照对象是Spring Cloud Stream自身,从Stream发布消息就是输出接受消息就是输入

    1.5 常用API/注解

    二、案例演示

    2.1 生产者

            1.建模块:cloud-stream-rabbitmq-provider8801

            2.POM

    1. <dependencies>
    2. <dependency>
    3. <groupId>org.springframework.bootgroupId>
    4. <artifactId>spring-boot-starter-webartifactId>
    5. dependency>
    6. <dependency>
    7. <groupId>org.springframework.bootgroupId>
    8. <artifactId>spring-boot-starter-actuatorartifactId>
    9. dependency>
    10. <dependency>
    11. <groupId>org.springframework.cloudgroupId>
    12. <artifactId>spring-cloud-starter-netflix-eureka-clientartifactId>
    13. dependency>
    14. <dependency>
    15. <groupId>org.springframework.cloudgroupId>
    16. <artifactId>spring-cloud-starter-stream-rabbitartifactId>
    17. dependency>
    18. <dependency>
    19. <groupId>org.springframework.bootgroupId>
    20. <artifactId>spring-boot-devtoolsartifactId>
    21. <scope>runtimescope>
    22. <optional>trueoptional>
    23. dependency>
    24. <dependency>
    25. <groupId>org.projectlombokgroupId>
    26. <artifactId>lombokartifactId>
    27. <optional>trueoptional>
    28. dependency>
    29. <dependency>
    30. <groupId>org.springframework.bootgroupId>
    31. <artifactId>spring-boot-starter-testartifactId>
    32. <scope>testscope>
    33. dependency>
    34. dependencies>

            3.YML

    1. server:
    2. port: 8801
    3. spring:
    4. application:
    5. name: cloud-stream-provider
    6. cloud:
    7. stream:
    8. binders: # 在此处配置要绑定的rabbitmq的服务信息;
    9. defaultRabbit: # 表示定义的名称,用于于binding整合
    10. type: rabbit # 消息组件类型
    11. environment: # 设置rabbitmq的相关的环境配置
    12. spring:
    13. rabbitmq:
    14. host: 192.168.80.128
    15. port: 5672
    16. username: admin
    17. password: 123
    18. bindings: # 服务的整合处理
    19. output: # 这个名字是一个通道的名称
    20. destination: studyExchange # 表示要使用的Exchange名称定义
    21. content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
    22. binder: defaultRabbit # 设置要绑定的消息服务的具体设置
    23. eureka:
    24. client: # 客户端进行Eureka注册的配置
    25. service-url:
    26. defaultZone: http://localhost:7001/eureka
    27. instance:
    28. lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
    29. lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
    30. instance-id: send-8801.com # 在信息列表时显示主机名称
    31. prefer-ip-address: true # 访问的路径变为IP地址

            4.主启动

    1. @SpringBootApplication
    2. public class StreamMQMain8801
    3. {
    4. public static void main(String[] args)
    5. {
    6. SpringApplication.run(StreamMQMain8801.class,args);
    7. }
    8. }

            5.业务类

                    service接口

    1. public interface IMessageProvider {
    2. public String send();
    3. }

                    service实现类,注意@EnableBinding注解

    1. @EnableBinding(Source.class)//定义消息的推送管道
    2. public class MessageProviderImpl implements IMessageProvider {
    3. @Resource
    4. private MessageChannel output;
    5. @Override
    6. public String send() {
    7. String s = UUID.randomUUID().toString();
    8. output.send(MessageBuilder.withPayload(s).build());
    9. System.out.println("provider已发送消息...");
    10. return null;
    11. }
    12. }

                     controller

    1. @RestController
    2. public class SendMessageController {
    3. @Resource
    4. private IMessageProvider messageProvider;
    5. @GetMapping("/sendMessage")
    6. public String sendMessage(){
    7. return messageProvider.send();
    8. }
    9. }

            6.测试

                    在rabbit的管理界面中可以看到多了一个我们自定义的exchange

                    输入:http://localhost:8801/sendMessage 消息发送成功

    2.2 消费者

            1.建模块:cloud-stream-rabbitmq-consumer8802

            2.POM

    1. <dependencies>
    2. <dependency>
    3. <groupId>org.springframework.bootgroupId>
    4. <artifactId>spring-boot-starter-webartifactId>
    5. dependency>
    6. <dependency>
    7. <groupId>org.springframework.cloudgroupId>
    8. <artifactId>spring-cloud-starter-netflix-eureka-clientartifactId>
    9. dependency>
    10. <dependency>
    11. <groupId>org.springframework.cloudgroupId>
    12. <artifactId>spring-cloud-starter-stream-rabbitartifactId>
    13. dependency>
    14. <dependency>
    15. <groupId>org.springframework.bootgroupId>
    16. <artifactId>spring-boot-starter-actuatorartifactId>
    17. dependency>
    18. <dependency>
    19. <groupId>org.springframework.bootgroupId>
    20. <artifactId>spring-boot-devtoolsartifactId>
    21. <scope>runtimescope>
    22. <optional>trueoptional>
    23. dependency>
    24. <dependency>
    25. <groupId>org.projectlombokgroupId>
    26. <artifactId>lombokartifactId>
    27. <optional>trueoptional>
    28. dependency>
    29. <dependency>
    30. <groupId>org.springframework.bootgroupId>
    31. <artifactId>spring-boot-starter-testartifactId>
    32. <scope>testscope>
    33. dependency>
    34. dependencies>

            3.YML

    1. server:
    2. port: 8802
    3. spring:
    4. application:
    5. name: cloud-stream-consumer
    6. cloud:
    7. stream:
    8. binders: # 在此处配置要绑定的rabbitmq的服务信息;
    9. defaultRabbit: # 表示定义的名称,用于于binding整合
    10. type: rabbit # 消息组件类型
    11. environment: # 设置rabbitmq的相关的环境配置
    12. spring:
    13. rabbitmq:
    14. host: 192.168.80.128
    15. port: 5672
    16. username: admin
    17. password: 123
    18. bindings: # 服务的整合处理
    19. input: # 这个名字是一个通道的名称
    20. destination: studyExchange # 表示要使用的Exchange名称定义
    21. content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain”
    22. binder: defaultRabbit # 设置要绑定的消息服务的具体设置
    23. eureka:
    24. client: # 客户端进行Eureka注册的配置
    25. service-url:
    26. defaultZone: http://localhost:7001/eureka
    27. instance:
    28. lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
    29. lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
    30. instance-id: receive-8802.com # 在信息列表时显示主机名称
    31. prefer-ip-address: true # 访问的路径变为IP地址

            4.主启动

    1. @SpringBootApplication
    2. public class StreamMQMain8802
    3. {
    4. public static void main(String[] args)
    5. {
    6. SpringApplication.run(StreamMQMain8802.class,args);
    7. }
    8. }

            5.业务类,注意@EnableBinding、 @StreamListener注解

    1. @Component
    2. @EnableBinding(Sink.class)
    3. public class ReceiveMessageListener
    4. {
    5. @Value("${server.port}")
    6. private String serverPort;
    7. @StreamListener(Sink.INPUT)
    8. public void input(Message message)
    9. {
    10. System.out.println("消费者1号,------->接收到的消息:" + message.getPayload()+"\t port: "+serverPort);
    11. }
    12. }

            6.测试

            输入:http://localhost:8801/sendMessage

            生产者端: 

    provider已发送消息...

            消费者端:

    消费者1号,------->接收到的消息:3aeca05a-d25e-4249-8e94-2ddb4d84a318     port: 8802

    2.3 分组消费与持久化

    2.3.1 环境准备

            现在我们再引入一个消费者:cloud-stream-rabbitmq-consumer8803

            1.POM

    1. <dependencies>
    2. <dependency>
    3. <groupId>org.springframework.bootgroupId>
    4. <artifactId>spring-boot-starter-webartifactId>
    5. dependency>
    6. <dependency>
    7. <groupId>org.springframework.cloudgroupId>
    8. <artifactId>spring-cloud-starter-netflix-eureka-clientartifactId>
    9. dependency>
    10. <dependency>
    11. <groupId>org.springframework.cloudgroupId>
    12. <artifactId>spring-cloud-starter-stream-rabbitartifactId>
    13. dependency>
    14. <dependency>
    15. <groupId>org.springframework.bootgroupId>
    16. <artifactId>spring-boot-starter-actuatorartifactId>
    17. dependency>
    18. <dependency>
    19. <groupId>org.springframework.bootgroupId>
    20. <artifactId>spring-boot-devtoolsartifactId>
    21. <scope>runtimescope>
    22. <optional>trueoptional>
    23. dependency>
    24. <dependency>
    25. <groupId>org.projectlombokgroupId>
    26. <artifactId>lombokartifactId>
    27. <optional>trueoptional>
    28. dependency>
    29. <dependency>
    30. <groupId>org.springframework.bootgroupId>
    31. <artifactId>spring-boot-starter-testartifactId>
    32. <scope>testscope>
    33. dependency>
    34. dependencies>

            2.YML

    1. server:
    2. port: 8803
    3. spring:
    4. application:
    5. name: cloud-stream-consumer
    6. cloud:
    7. stream:
    8. binders: # 在此处配置要绑定的rabbitmq的服务信息;
    9. defaultRabbit: # 表示定义的名称,用于于binding整合
    10. type: rabbit # 消息组件类型
    11. environment: # 设置rabbitmq的相关的环境配置
    12. spring:
    13. rabbitmq:
    14. host: 192.168.80.128
    15. port: 5672
    16. username: admin
    17. password: 123
    18. bindings: # 服务的整合处理
    19. input: # 这个名字是一个通道的名称
    20. destination: studyExchange # 表示要使用的Exchange名称定义
    21. content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain”
    22. binder: defaultRabbit # 设置要绑定的消息服务的具体设置
    23. eureka:
    24. client: # 客户端进行Eureka注册的配置
    25. service-url:
    26. defaultZone: http://localhost:7001/eureka
    27. instance:
    28. lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
    29. lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
    30. instance-id: receive-8803.com # 在信息列表时显示主机名称
    31. prefer-ip-address: true # 访问的路径变为IP地址

            3.主启动

    1. @SpringBootApplication
    2. public class StreamMQMain8803
    3. {
    4. public static void main(String[] args)
    5. {
    6. SpringApplication.run(StreamMQMain8803.class,args);
    7. }
    8. }

            4.业务类

    1. @Component
    2. @EnableBinding(Sink.class)
    3. public class ReceiveMessageListener
    4. {
    5. @Value("${server.port}")
    6. private String serverPort;
    7. @StreamListener(Sink.INPUT)
    8. public void input(Message message)
    9. {
    10. System.out.println("消费者2号,------->接收到的消息:" + message.getPayload()+"\t port: "+serverPort);
    11. }
    12. }

    2.3.2 分组

            如果这时候我们再让生产者发送信息,则消费者8002 消费者8003都收到了消息。这是我们不希望看到的结果,我们希望一个消息只能被消费一次

            这时我们就可以使用Stream中的消息分组来解决

            注意在Stream中处于同一个group中的多个消费者竞争关系,就能够保证消息只会被其中一个应用消费一次。不同组是可以全面消费的(重复消费)。

            所以我们可以把这两个微服务放在一个组内,这样就避免了重复消费

            在分组前,可以在rabbitmq的后台看到下面的默认队列

             我们分别给80028003添加一个,YML种添加group属性,如下所示

    1. server:
    2. port: 8802
    3. spring:
    4. application:
    5. name: cloud-stream-consumer
    6. cloud:
    7. stream:
    8. binders: # 在此处配置要绑定的rabbitmq的服务信息;
    9. defaultRabbit: # 表示定义的名称,用于于binding整合
    10. type: rabbit # 消息组件类型
    11. environment: # 设置rabbitmq的相关的环境配置
    12. spring:
    13. rabbitmq:
    14. host: 192.168.80.128
    15. port: 5672
    16. username: admin
    17. password: 123
    18. bindings: # 服务的整合处理
    19. input: # 这个名字是一个通道的名称,在分析具体源代码的时候会进行说明
    20. destination: studyExchange # 表示要使用的Exchange名称定义
    21. content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain”
    22. binder: defaultRabbit # 设置要绑定的消息服务的具体设置
    23. group: A
    24. eureka:
    25. client: # 客户端进行Eureka注册的配置
    26. service-url:
    27. defaultZone: http://localhost:7001/eureka
    28. instance:
    29. lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
    30. lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
    31. instance-id: receive-8802.com # 在信息列表时显示主机名称
    32. prefer-ip-address: true # 访问的路径变为IP地址
    1. server:
    2. port: 8803
    3. spring:
    4. application:
    5. name: cloud-stream-consumer
    6. cloud:
    7. stream:
    8. binders: # 在此处配置要绑定的rabbitmq的服务信息;
    9. defaultRabbit: # 表示定义的名称,用于于binding整合
    10. type: rabbit # 消息组件类型
    11. environment: # 设置rabbitmq的相关的环境配置
    12. spring:
    13. rabbitmq:
    14. host: 192.168.80.128
    15. port: 5672
    16. username: admin
    17. password: 123
    18. bindings: # 服务的整合处理
    19. input: # 这个名字是一个通道的名称
    20. destination: studyExchange # 表示要使用的Exchange名称定义
    21. content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain”
    22. binder: defaultRabbit # 设置要绑定的消息服务的具体设置
    23. group: B
    24. eureka:
    25. client: # 客户端进行Eureka注册的配置
    26. service-url:
    27. defaultZone: http://localhost:7001/eureka
    28. instance:
    29. lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
    30. lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
    31. instance-id: receive-8803.com # 在信息列表时显示主机名称
    32. prefer-ip-address: true # 访问的路径变为IP地址

             设置完成后进入rabbitmq后台,可以看到分组后队列

             如果希望80028003一个组,就是消息只被消费一次,修改两个的group名字一致即可

    2.3.3 持久化

            如果我们没有对某个实例分组,例如8002未分组。8003设置了组名”A“。主动把80028003宕掉,然后8001向他们发送消息。最后重启80028003,那么8003还会收到它离线那段时间8001发送的消息,而8002收不到了。

            其实再回头观察一下,如果用的是默认binder没有分组),其生成默认队列不是持久化的

            所以要想消息得到持久化,还是要设置好微服务实例分组

  • 相关阅读:
    用VS Code搞Qt6:编译源代码与基本配置
    sqli-labs部分关思路
    #DAYU200#OpenHarmony 视频播放器
    【21天学习挑战赛—经典算法】LeetCode 922. 按奇偶排序数组 II
    【紫光同创国产FPGA教程】——PDS安装教程
    java计算机毕业设计医院人事档案管理系源码+系统+mysql数据库+lw文档
    Java多并发(三)| 线程间的通信(ThreadLoacl详解)
    【Azure Developer】使用 Microsoft Graph API 获取 AAD User 操作示例
    洛谷 模板汇总 算法基础 python解析
    掌握Go语言:探索Go语言中的循环奇妙世界,从基础到实战(13)
  • 原文地址:https://blog.csdn.net/weixin_62427168/article/details/126493136