本文主要介绍如何在springcloud中通过spring-cloud-stream实现消息队列rabbitmq的消息生产和消费。本例使用的springcloud版本为:2021.0.3,springboot版本为:2.6.8。
本例通过创建一个生产者项目provide-stream-8011和两个消费者项目consumer-stream-8012、consumer-stream-8013来进行演示。
打开idea新建项目,选择maven,创建springboot项目provider-stream-8011。
在项目pom中引入spring-cloud-starter-netflix-eureka-client和spring-cloud-starter-stream-rabbit依赖:
- <dependencies>
- <dependency>
- <groupId>org.springframework.bootgroupId>
- <artifactId>spring-boot-starter-webartifactId>
- dependency>
-
- <dependency>
- <groupId>org.projectlombokgroupId>
- <artifactId>lombokartifactId>
- dependency>
-
- <dependency>
- <groupId>org.springframework.cloudgroupId>
- <artifactId>spring-cloud-starter-netflix-eureka-clientartifactId>
- dependency>
-
- <dependency>
- <groupId>org.springframework.cloudgroupId>
- <artifactId>spring-cloud-starter-stream-rabbitartifactId>
- dependency>
- dependencies>
在项目resources文件夹下创建application.yml文件,并按如下内容进行配置:
- server:
- port: 8011
-
- eureka:
- client:
- service-url:
- defaultZone: http://localhost:7001/eureka
- fetch-registry: true
- instance:
- instance-id: provider-stream-${server.port}
- prefer-ip-address: true
-
- spring:
- application:
- name: provider-stream
-
- #消息中间件配置
- rabbitmq:
- host: 127.0.0.1
- port: 5672
- username: spring
- password: 123456
- cloud:
- stream:
- bindings:
- output-out-0: #通道名称
- destination: message #exchange
在项目src/main/java下创建主应用类ProviderStreamApplication.java,添加注解@EnableEurekaClient、@SpringBootApplication。
- @EnableEurekaClient
- @SpringBootApplication
- public class ProviderStreamApplication {
- public static void main(String[] args) {
- SpringApplication.run(ProviderStreamApplication.class, args);
- }
- }
创建SendMessageService类实现生产者发送消息功能。streamBridge为消息队列的桥接对象,send方法的第一个参数是通道的名称,第二个参数为发送的数据。
- @Service
- @Slf4j
- public class SendMessageService {
- @Resource
- private StreamBridge streamBridge;
-
- // 发送消息
- public void send(){
- String data = UUID.randomUUID().toString();
- log.info("发送的消息: {}", data);
- streamBridge.send("output-out-0", data);
- }
- }
创建SendMessageController类实现web的访问,并通过调用SendMessageService的send方法进行消息发送。
- @RestController
- @Slf4j
- public class SendMessageController {
- @Resource
- private SendMessageService sendMessageService;
-
- @GetMapping("/send")
- public void send(){
- sendMessageService.send();
- }
- }
打开idea新建项目,选择maven,创建springboot项目consumer-stream-8012。
在项目pom中引入spring-cloud-starter-netflix-eureka-client和spring-cloud-starter-stream-rabbit依赖:
- <dependencies>
- <dependency>
- <groupId>org.springframework.bootgroupId>
- <artifactId>spring-boot-starter-webartifactId>
- dependency>
-
- <dependency>
- <groupId>org.projectlombokgroupId>
- <artifactId>lombokartifactId>
- dependency>
-
- <dependency>
- <groupId>org.springframework.cloudgroupId>
- <artifactId>spring-cloud-starter-netflix-eureka-clientartifactId>
- dependency>
-
- <dependency>
- <groupId>org.springframework.cloudgroupId>
- <artifactId>spring-cloud-starter-stream-rabbitartifactId>
- dependency>
- dependencies>
在项目resources文件夹下创建application.yml文件,并按如下内容进行配置:
- server:
- port: 8012
-
- eureka:
- client:
- service-url:
- defaultZone: http://localhost:7001/eureka
- fetch-registry: true
- instance:
- instance-id: consumer-stream-${server.port}
- prefer-ip-address: true
-
- spring:
- application:
- name: consumer-stream
-
- #消息中间件配置
- rabbitmq:
- host: 127.0.0.1
- port: 5672
- username: spring
- password: 123456
- cloud:
- stream:
- bindings:
- input-in-0: #通道名称
- destination: message #exchange,需要与发送者的配置名称一致才能收到对应消息
- group: consumer #分组,同一组的消息只会被一个消费者消费,避免重复消费。
在项目src/main/java下创建主应用类 GatewayApplication.java,添加注解@EnableEurekaClient、@SpringBootApplication
- @SpringBootApplication
- @EnableEurekaClient
- public class ConsumerStreamApplication {
- public static void main(String[] args) {
- SpringApplication.run(ConsumerStreamApplication.class, args);
- }
- }
在config文件夹下创建配置类StreamConfig,并编写消费类bean实现消息消费。
- @Configuration
- @Slf4j
- public class StreamConfig {
- @Bean
- public Consumer
input() { - return (data) -> log.info("收到的消息:{}", data);
- }
- }
参考consumer-stream-8012项目进行创建即可。
同时启动并运行项目eueka-server-7001、provider-stream-8011、consumer-stream-8012、consumer-stream-8013。通过访问http://127.0.0.1:8011/send发送消息,可以分别在provider-stream-8011、consumer-stream-8012的控制台看到收到的消息日志。