工程中新建三个子模块
在7001端口使用eureka注册服务中心(自行准备)
cloud-stream-rabbitmq-provider8801, 作为生产者进行发消息模块
cloud-stream-rabbitmq-consumer8802,作为消息接收模块
cloud-stream-rabbitmq-consumer8803 作为消息接收模块
新建Module:cloud-stream-rabbitmq-provider8801
版本已交给父工程控制
-
-
- <properties>
- <project.build.sourceEncoding>UTF-8project.build.sourceEncoding>
- <maven.compiler.source>1.8maven.compiler.source>
- <maven.compiler.target>1.8maven.compiler.target>
- <junit.version>4.12junit.version>
- <log4j.version>1.2.17log4j.version>
- <lombok.version>1.16.18lombok.version>
- <mysql.version>8.0.16mysql.version>
- <druid.version>1.1.16druid.version>
- <mybatis.spring.boot.version>1.3.0mybatis.spring.boot.version>
- properties>
-
- "1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <parent>
- <artifactId>mscloudartifactId>
- <groupId>com.atguigu.springcloudgroupId>
- <version>1.0-SNAPSHOTversion>
- parent>
- <modelVersion>4.0.0modelVersion>
-
- <artifactId>cloud-stream-rabbitmq-provider8801artifactId>
-
- <dependencies>
- <dependency>
- <groupId>org.springframework.bootgroupId>
- <artifactId>spring-boot-starter-webartifactId>
- dependency>
- <dependency>
- <groupId>org.springframework.bootgroupId>
- <artifactId>spring-boot-starter-actuatorartifactId>
- dependency>
- <dependency>
- <groupId>org.springframework.cloudgroupId>
- <artifactId>spring-cloud-starter-netflix-eureka-clientartifactId>
- dependency>
- <dependency>
- <groupId>org.springframework.cloudgroupId>
- <artifactId>spring-cloud-starter-stream-rabbitartifactId>
- dependency>
-
- <dependency>
- <groupId>org.springframework.bootgroupId>
- <artifactId>spring-boot-devtoolsartifactId>
- <scope>runtimescope>
- <optional>trueoptional>
- dependency>
- <dependency>
- <groupId>org.projectlombokgroupId>
- <artifactId>lombokartifactId>
- <optional>trueoptional>
- dependency>
- <dependency>
- <groupId>org.springframework.bootgroupId>
- <artifactId>spring-boot-starter-testartifactId>
- <scope>testscope>
- dependency>
- dependencies>
-
- project>
- server:
- port: 8801
-
- spring:
- application:
- name: cloud-stream-provider
- cloud:
- stream:
- binders: # 在此处配置要绑定的rabbitmq的服务信息;
- defaultRabbit: # 表示定义的名称,用于于binding整合
- type: rabbit # 消息组件类型
- environment: # 设置rabbitmq的相关的环境配置
- spring:
- rabbitmq:
- host: localhost
- port: 5672
- username: guest
- password: guest
- bindings: # 服务的整合处理
- output: # 这个名字是一个通道的名称
- destination: studyExchange # 表示要使用的Exchange名称定义
- content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
- binder: defaultRabbit # 设置要绑定的消息服务的具体设置
-
- eureka:
- client: # 客户端进行Eureka注册的配置
- service-url:
- defaultZone: http://localhost:7001/eureka
- instance:
- lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
- lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
- instance-id: send-8801.com # 在信息列表时显示主机名称
- prefer-ip-address: true # 访问的路径变为IP地址
cloud:
stream:
binders: # 在此处配置要绑定的rabbitmq的服务信息;
defaultRabbit: # 表示定义的名称,用于于binding整合
type: rabbit # 消息组件类型
environment: # 设置rabbitmq的相关的环境配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # 服务的整合处理
output: # 这个名字是一个通道的名称
destination: studyExchange # 表示要使用的Exchange名称定义
content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
binder: defaultRabbit # 设置要绑定的消息服务的具体设置
- @SpringBootApplication
- public class StreamMQMain8801
- {
- public static void main(String[] args)
- {
- SpringApplication.run(StreamMQMain8801.class,args);
- }
- }
发送消息接口
- public interface IMessageProvider
- {
- public String send() ;
- }
-
发送消息接口实现类
- @EnableBinding(Source.class) // 可以理解为是一个消息的发送管道的定义
- public class MessageProviderImpl implements IMessageProvider
- {
- @Resource
- private MessageChannel output; // 消息的发送管道
-
- @Override
- public String send()
- {
- String serial = UUID.randomUUID().toString();
- this.output.send(MessageBuilder.withPayload(serial).build()); // 创建并发送消息
- System.out.println("***serial: "+serial);
-
- return serial;
- }
- }
Controller
- @RestController
- public class SendMessageController
- {
- @Resource
- private IMessageProvider messageProvider;
-
- @GetMapping(value = "/sendMessage")
- public String sendMessage()
- {
- return messageProvider.send();
- }
- }
测试1:启动7001 8801服务
查看rabbitMQ界面查看
访问
http://localhost:8801/sendMessage
且可以在图形化界面中可以看到波峰起伏
新建Module:cloud-stream-rabbitmq-consumer8802
- "1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <parent>
- <artifactId>mscloudartifactId>
- <groupId>com.atguigu.springcloudgroupId>
- <version>1.0-SNAPSHOTversion>
- parent>
- <modelVersion>4.0.0modelVersion>
-
- <artifactId>cloud-stream-rabbitmq-provider8801artifactId>
-
- <dependencies>
- <dependency>
- <groupId>org.springframework.bootgroupId>
- <artifactId>spring-boot-starter-webartifactId>
- dependency>
- <dependency>
- <groupId>org.springframework.cloudgroupId>
- <artifactId>spring-cloud-starter-netflix-eureka-clientartifactId>
- dependency>
- <dependency>
- <groupId>org.springframework.cloudgroupId>
- <artifactId>spring-cloud-starter-stream-rabbitartifactId>
- dependency>
- <dependency>
- <groupId>org.springframework.bootgroupId>
- <artifactId>spring-boot-starter-actuatorartifactId>
- dependency>
-
- <dependency>
- <groupId>org.springframework.bootgroupId>
- <artifactId>spring-boot-devtoolsartifactId>
- <scope>runtimescope>
- <optional>trueoptional>
- dependency>
- <dependency>
- <groupId>org.projectlombokgroupId>
- <artifactId>lombokartifactId>
- <optional>trueoptional>
- dependency>
- <dependency>
- <groupId>org.springframework.bootgroupId>
- <artifactId>spring-boot-starter-testartifactId>
- <scope>testscope>
- dependency>
- dependencies>
-
- project>
- server:
- port: 8802
-
- spring:
- application:
- name: cloud-stream-consumer
- cloud:
- stream:
- binders: # 在此处配置要绑定的rabbitmq的服务信息;
- defaultRabbit: # 表示定义的名称,用于于binding整合
- type: rabbit # 消息组件类型
- environment: # 设置rabbitmq的相关的环境配置
- spring:
- rabbitmq:
- host: localhost
- port: 5672
- username: guest
- password: guest
- bindings: # 服务的整合处理
- input: # 这个名字是一个通道的名称
- destination: studyExchange # 表示要使用的Exchange名称定义
- content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain”
- binder: defaultRabbit # 设置要绑定的消息服务的具体设置
-
- eureka:
- client: # 客户端进行Eureka注册的配置
- service-url:
- defaultZone: http://localhost:7001/eureka
- instance:
- lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
- lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
- instance-id: receive-8802.com # 在信息列表时显示主机名称
- prefer-ip-address: true # 访问的路径变为IP地址
cloud:
stream:
binders: # 在此处配置要绑定的rabbitmq的服务信息;
defaultRabbit: # 表示定义的名称,用于于binding整合
type: rabbit # 消息组件类型
environment: # 设置rabbitmq的相关的环境配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # 服务的整合处理
input: # 这个名字是一个通道的名称
destination: studyExchange # 表示要使用的Exchange名称定义
content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain”
binder: defaultRabbit # 设置要绑定的消息服务的具体设置
- @SpringBootApplication
- public class StreamMQMain8802
- {
- public static void main(String[] args)
- {
- SpringApplication.run(StreamMQMain8802.class,args);
- }
- }
- @Component
- @EnableBinding(Sink.class)
- public class ReceiveMessageListener
- {
- @Value("${server.port}")
- private String serverPort;
-
- @StreamListener(Sink.INPUT)
- public void input(Message
message) - {
- System.out.println("消费者1号,------->接收到的消息:" + message.getPayload()+"\t port: "+serverPort);
- }
- }
测试2:
启动8801 8802 7001
http://localhost:8801/sendMessage
8801控制台
8802控制台:
依照8802,clone出来一份运行8803
除端口号不一样外,其他的一致,即该端口号为8803
测试3:
启动7001 8803 8802 8801
访问
http://localhost:8801/sendMessage
可以看到由8801发送的消息被8802 8803都会消费
有重复消费问题
消息持久化问题
目前是8802/8803同时都收到了,存在重复消费问题
如何解决?分组和持久化属性group
比如在如下场景中,订单系统我们做集群部署,都会从RabbitMQ中获取订单信息,
那如果一个订单同时被两个服务获取到,那么就会造成数据错误,我们得避免这种情况。
这时我们就可以使用Stream中的消息分组来解决

注意在Stream中处于同一个group中的多个消费者是竞争关系,就能够保证消息只会被其中一个应用消费一次。
不同组是可以全面消费的(重复消费),
同一组内会发生竞争关系,只有其中一个可以消费。
微服务应用放置于同一个group中,就能够保证消息只会被其中一个应用消费一次。
不同的组是可以消费的,同一个组内会发生竞争关系,只有其中一个可以消费。

增加分组group:atguiguA

增加分组group:atguiguB
测试4:
重复测试3还是一样的结果
8802/8803实现轮询分组,每次只有一个消费者8801模块的发的消息只能被8802或8803其中一个接收到,这样避免了重复消费。
将8803的yml文件中分组修改成为:
group: atguiguA
测试4:
重复测试3,在这次访问俩次 http://localhost:8801/sendMessage
8801控制台:
8802控制台:
8803控制台:
可以发现已经实现目的
通过上述,解决了重复消费问题,再看看持久化
修改:
停止8802/8803并去除掉8802的分组group: atguiguA
8803的分组group: atguiguA没有去掉
测试6:
8801先发送4条消息到rabbitmq,即访问http://localhost:8801/sendMessage四次
8801控制台:
先启动8802,无分组属性配置,后台没有打出来消息
8802控制台:
再启动8803,有分组属性配置,后台打出来了MQ上的消息