目录
Spring Cloud Stream 是一个构建消息驱动微服务的框架。应用程序通过 inputs 或outputs 来与 Spring Cloud Stream中binder对象交互。通过我们配置来进行binding(绑定) ,而 Spring Cloud Stream 的 binder对象负责与消息中间件交互。所以,我们只需要搞清楚如何与 Spring Cloud Stream 交互就可以方便使用消息驱动的方式。
Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。目前仅支持RabbitMQ、Kafka。
一句话来说,就是屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型。
比如RabbitMQ和Kafka,这些中间件架构上有差异。差异性导致我们实际项目开发给我们造成了一定的困扰,我们如果用了两个消息队列的其中一种,后面的业务需求我想往另外一种消息队列进行迁移,这时候无疑就是一个灾难性的,一大堆东西都要重新推倒重新做,因为它跟我们的系统耦合了,这时候springcloud Stream给我们提供了一种解耦合的方式。
Stream中的消息通信方式遵循了发布-订阅模式,Topic主题进行广播,在RabbitMQ就是Exchange,在Kakfa中就是Topic。
在没有绑定器这个概念的情况下,我们的SpringBoot应用要直接与消息中间件进行信息交互的时候,由于各消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性。通过定义绑定器作为中间层,完美地实现了应用程序与消息中间件细节之间的隔离。通过向应用程序暴露统一的Channel通道,使得应用程序不需要再考虑各种不同的消息中间件实现。
通过定义绑定器Binder作为中间层,实现了应用程序与消息中间件细节之间的隔离。
Stream对消息中间件的进一步封装,可以做到代码层面对中间件的无感知,甚至于动态的切换中间件(rabbitmq切换为kafka),使得微服务开发的高度解耦,服务可以关注更多自己的业务流程。

Binder可以生成Binding,Binding用来绑定消息容器的生产者和消费者,它有两种类型,INPUT和OUTPUT,INPUT对应于消费者,OUTPUT对应于生产者。

1.Binder-很方便的连接中间件,屏蔽差异
2.Channel-是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过Channel对队列进行配置
3.Source和Sink - 简单的可理解为参照对象是Spring Cloud Stream自身,从Stream发布消息就是输出,接受消息就是输入。

1.建模块:cloud-stream-rabbitmq-provider8801
2.POM
- <dependencies>
- <dependency>
- <groupId>org.springframework.bootgroupId>
- <artifactId>spring-boot-starter-webartifactId>
- dependency>
- <dependency>
- <groupId>org.springframework.bootgroupId>
- <artifactId>spring-boot-starter-actuatorartifactId>
- dependency>
- <dependency>
- <groupId>org.springframework.cloudgroupId>
- <artifactId>spring-cloud-starter-netflix-eureka-clientartifactId>
- dependency>
- <dependency>
- <groupId>org.springframework.cloudgroupId>
- <artifactId>spring-cloud-starter-stream-rabbitartifactId>
- dependency>
-
- <dependency>
- <groupId>org.springframework.bootgroupId>
- <artifactId>spring-boot-devtoolsartifactId>
- <scope>runtimescope>
- <optional>trueoptional>
- dependency>
- <dependency>
- <groupId>org.projectlombokgroupId>
- <artifactId>lombokartifactId>
- <optional>trueoptional>
- dependency>
- <dependency>
- <groupId>org.springframework.bootgroupId>
- <artifactId>spring-boot-starter-testartifactId>
- <scope>testscope>
- dependency>
- dependencies>
3.YML
- server:
- port: 8801
-
- spring:
- application:
- name: cloud-stream-provider
- cloud:
- stream:
- binders: # 在此处配置要绑定的rabbitmq的服务信息;
- defaultRabbit: # 表示定义的名称,用于于binding整合
- type: rabbit # 消息组件类型
- environment: # 设置rabbitmq的相关的环境配置
- spring:
- rabbitmq:
- host: 192.168.80.128
- port: 5672
- username: admin
- password: 123
- bindings: # 服务的整合处理
- output: # 这个名字是一个通道的名称
- destination: studyExchange # 表示要使用的Exchange名称定义
- content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
- binder: defaultRabbit # 设置要绑定的消息服务的具体设置
-
- eureka:
- client: # 客户端进行Eureka注册的配置
- service-url:
- defaultZone: http://localhost:7001/eureka
- instance:
- lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
- lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
- instance-id: send-8801.com # 在信息列表时显示主机名称
- prefer-ip-address: true # 访问的路径变为IP地址
4.主启动
- @SpringBootApplication
- public class StreamMQMain8801
- {
- public static void main(String[] args)
- {
- SpringApplication.run(StreamMQMain8801.class,args);
- }
- }
5.业务类
service接口
- public interface IMessageProvider {
- public String send();
- }
service实现类,注意@EnableBinding注解
- @EnableBinding(Source.class)//定义消息的推送管道
- public class MessageProviderImpl implements IMessageProvider {
- @Resource
- private MessageChannel output;
-
- @Override
- public String send() {
- String s = UUID.randomUUID().toString();
- output.send(MessageBuilder.withPayload(s).build());
- System.out.println("provider已发送消息...");
- return null;
- }
- }
controller
- @RestController
- public class SendMessageController {
- @Resource
- private IMessageProvider messageProvider;
- @GetMapping("/sendMessage")
- public String sendMessage(){
- return messageProvider.send();
- }
- }
6.测试
在rabbit的管理界面中可以看到多了一个我们自定义的exchange
输入:http://localhost:8801/sendMessage 消息发送成功
1.建模块:cloud-stream-rabbitmq-consumer8802
2.POM
- <dependencies>
- <dependency>
- <groupId>org.springframework.bootgroupId>
- <artifactId>spring-boot-starter-webartifactId>
- dependency>
- <dependency>
- <groupId>org.springframework.cloudgroupId>
- <artifactId>spring-cloud-starter-netflix-eureka-clientartifactId>
- dependency>
- <dependency>
- <groupId>org.springframework.cloudgroupId>
- <artifactId>spring-cloud-starter-stream-rabbitartifactId>
- dependency>
- <dependency>
- <groupId>org.springframework.bootgroupId>
- <artifactId>spring-boot-starter-actuatorartifactId>
- dependency>
-
- <dependency>
- <groupId>org.springframework.bootgroupId>
- <artifactId>spring-boot-devtoolsartifactId>
- <scope>runtimescope>
- <optional>trueoptional>
- dependency>
- <dependency>
- <groupId>org.projectlombokgroupId>
- <artifactId>lombokartifactId>
- <optional>trueoptional>
- dependency>
- <dependency>
- <groupId>org.springframework.bootgroupId>
- <artifactId>spring-boot-starter-testartifactId>
- <scope>testscope>
- dependency>
- dependencies>
3.YML
- server:
- port: 8802
-
- spring:
- application:
- name: cloud-stream-consumer
- cloud:
- stream:
- binders: # 在此处配置要绑定的rabbitmq的服务信息;
- defaultRabbit: # 表示定义的名称,用于于binding整合
- type: rabbit # 消息组件类型
- environment: # 设置rabbitmq的相关的环境配置
- spring:
- rabbitmq:
- host: 192.168.80.128
- port: 5672
- username: admin
- password: 123
- bindings: # 服务的整合处理
- input: # 这个名字是一个通道的名称
- destination: studyExchange # 表示要使用的Exchange名称定义
- content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain”
- binder: defaultRabbit # 设置要绑定的消息服务的具体设置
-
- eureka:
- client: # 客户端进行Eureka注册的配置
- service-url:
- defaultZone: http://localhost:7001/eureka
- instance:
- lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
- lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
- instance-id: receive-8802.com # 在信息列表时显示主机名称
- prefer-ip-address: true # 访问的路径变为IP地址
4.主启动
- @SpringBootApplication
- public class StreamMQMain8802
- {
- public static void main(String[] args)
- {
- SpringApplication.run(StreamMQMain8802.class,args);
- }
- }
5.业务类,注意@EnableBinding、 @StreamListener注解
- @Component
- @EnableBinding(Sink.class)
- public class ReceiveMessageListener
- {
- @Value("${server.port}")
- private String serverPort;
-
- @StreamListener(Sink.INPUT)
- public void input(Message
message) - {
- System.out.println("消费者1号,------->接收到的消息:" + message.getPayload()+"\t port: "+serverPort);
- }
- }
6.测试
输入:http://localhost:8801/sendMessage
生产者端:
provider已发送消息...
消费者端:
消费者1号,------->接收到的消息:3aeca05a-d25e-4249-8e94-2ddb4d84a318 port: 8802
现在我们再引入一个消费者:cloud-stream-rabbitmq-consumer8803
1.POM
- <dependencies>
- <dependency>
- <groupId>org.springframework.bootgroupId>
- <artifactId>spring-boot-starter-webartifactId>
- dependency>
- <dependency>
- <groupId>org.springframework.cloudgroupId>
- <artifactId>spring-cloud-starter-netflix-eureka-clientartifactId>
- dependency>
- <dependency>
- <groupId>org.springframework.cloudgroupId>
- <artifactId>spring-cloud-starter-stream-rabbitartifactId>
- dependency>
- <dependency>
- <groupId>org.springframework.bootgroupId>
- <artifactId>spring-boot-starter-actuatorartifactId>
- dependency>
-
- <dependency>
- <groupId>org.springframework.bootgroupId>
- <artifactId>spring-boot-devtoolsartifactId>
- <scope>runtimescope>
- <optional>trueoptional>
- dependency>
- <dependency>
- <groupId>org.projectlombokgroupId>
- <artifactId>lombokartifactId>
- <optional>trueoptional>
- dependency>
- <dependency>
- <groupId>org.springframework.bootgroupId>
- <artifactId>spring-boot-starter-testartifactId>
- <scope>testscope>
- dependency>
- dependencies>
2.YML
- server:
- port: 8803
-
- spring:
- application:
- name: cloud-stream-consumer
- cloud:
- stream:
- binders: # 在此处配置要绑定的rabbitmq的服务信息;
- defaultRabbit: # 表示定义的名称,用于于binding整合
- type: rabbit # 消息组件类型
- environment: # 设置rabbitmq的相关的环境配置
- spring:
- rabbitmq:
- host: 192.168.80.128
- port: 5672
- username: admin
- password: 123
- bindings: # 服务的整合处理
- input: # 这个名字是一个通道的名称
- destination: studyExchange # 表示要使用的Exchange名称定义
- content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain”
- binder: defaultRabbit # 设置要绑定的消息服务的具体设置
-
- eureka:
- client: # 客户端进行Eureka注册的配置
- service-url:
- defaultZone: http://localhost:7001/eureka
- instance:
- lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
- lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
- instance-id: receive-8803.com # 在信息列表时显示主机名称
- prefer-ip-address: true # 访问的路径变为IP地址
3.主启动
- @SpringBootApplication
- public class StreamMQMain8803
- {
- public static void main(String[] args)
- {
- SpringApplication.run(StreamMQMain8803.class,args);
- }
- }
4.业务类
- @Component
- @EnableBinding(Sink.class)
- public class ReceiveMessageListener
- {
- @Value("${server.port}")
- private String serverPort;
-
- @StreamListener(Sink.INPUT)
- public void input(Message
message) - {
- System.out.println("消费者2号,------->接收到的消息:" + message.getPayload()+"\t port: "+serverPort);
- }
- }
如果这时候我们再让生产者发送信息,则消费者8002 和消费者8003都收到了消息。这是我们不希望看到的结果,我们希望一个消息只能被消费一次。
这时我们就可以使用Stream中的消息分组来解决

注意在Stream中处于同一个group中的多个消费者是竞争关系,就能够保证消息只会被其中一个应用消费一次。不同组是可以全面消费的(重复消费)。
所以我们可以把这两个微服务放在一个组内,这样就避免了重复消费
在分组前,可以在rabbitmq的后台看到下面的默认队列:

我们分别给8002、8003添加一个组,YML种添加group属性,如下所示
- server:
- port: 8802
-
- spring:
- application:
- name: cloud-stream-consumer
- cloud:
- stream:
- binders: # 在此处配置要绑定的rabbitmq的服务信息;
- defaultRabbit: # 表示定义的名称,用于于binding整合
- type: rabbit # 消息组件类型
- environment: # 设置rabbitmq的相关的环境配置
- spring:
- rabbitmq:
- host: 192.168.80.128
- port: 5672
- username: admin
- password: 123
- bindings: # 服务的整合处理
- input: # 这个名字是一个通道的名称,在分析具体源代码的时候会进行说明
- destination: studyExchange # 表示要使用的Exchange名称定义
- content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain”
- binder: defaultRabbit # 设置要绑定的消息服务的具体设置
- group: A
- eureka:
- client: # 客户端进行Eureka注册的配置
- service-url:
- defaultZone: http://localhost:7001/eureka
- instance:
- lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
- lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
- instance-id: receive-8802.com # 在信息列表时显示主机名称
- prefer-ip-address: true # 访问的路径变为IP地址
- server:
- port: 8803
-
- spring:
- application:
- name: cloud-stream-consumer
- cloud:
- stream:
- binders: # 在此处配置要绑定的rabbitmq的服务信息;
- defaultRabbit: # 表示定义的名称,用于于binding整合
- type: rabbit # 消息组件类型
- environment: # 设置rabbitmq的相关的环境配置
- spring:
- rabbitmq:
- host: 192.168.80.128
- port: 5672
- username: admin
- password: 123
- bindings: # 服务的整合处理
- input: # 这个名字是一个通道的名称
- destination: studyExchange # 表示要使用的Exchange名称定义
- content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain”
- binder: defaultRabbit # 设置要绑定的消息服务的具体设置
- group: B
-
- eureka:
- client: # 客户端进行Eureka注册的配置
- service-url:
- defaultZone: http://localhost:7001/eureka
- instance:
- lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
- lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
- instance-id: receive-8803.com # 在信息列表时显示主机名称
- prefer-ip-address: true # 访问的路径变为IP地址
设置完成后进入rabbitmq后台,可以看到分组后的队列

如果希望8002、8003在一个组,就是消息只被消费一次,修改两个的group名字一致即可
如果我们没有对某个实例分组,例如8002未分组。8003设置了组名”A“。主动把8002、8003宕掉,然后8001向他们发送消息。最后重启8002、8003,那么8003还会收到它离线那段时间8001发送的消息,而8002就收不到了。
其实再回头观察一下,如果用的是默认的binder(没有分组),其生成的默认队列不是持久化的:

所以要想消息得到持久化,还是要设置好微服务实例的分组。