• springcloud_2021.0.3学习笔记:通过spring-cloud-stream实现消息队列rabbitmq的消息生产和消费


            本文主要介绍如何在springcloud中通过spring-cloud-stream实现消息队列rabbitmq的消息生产和消费。本例使用的springcloud版本为:2021.0.3,springboot版本为:2.6.8。

            本例通过创建一个生产者项目provide-stream-8011和两个消费者项目consumer-stream-8012、consumer-stream-8013来进行演示。

    1、生产者provider项目配置

    1.1 创建provider-stream-8011项目

            打开idea新建项目,选择maven,创建springboot项目provider-stream-8011。

    1.2、pom文件配置

             在项目pom中引入spring-cloud-starter-netflix-eureka-client和spring-cloud-starter-stream-rabbit依赖:

    1. <dependencies>
    2. <dependency>
    3. <groupId>org.springframework.bootgroupId>
    4. <artifactId>spring-boot-starter-webartifactId>
    5. dependency>
    6. <dependency>
    7. <groupId>org.projectlombokgroupId>
    8. <artifactId>lombokartifactId>
    9. dependency>
    10. <dependency>
    11. <groupId>org.springframework.cloudgroupId>
    12. <artifactId>spring-cloud-starter-netflix-eureka-clientartifactId>
    13. dependency>
    14. <dependency>
    15. <groupId>org.springframework.cloudgroupId>
    16. <artifactId>spring-cloud-starter-stream-rabbitartifactId>
    17. dependency>
    18. dependencies>

     1.3、application.yml文件配置

              在项目resources文件夹下创建application.yml文件,并按如下内容进行配置:

    1. server:
    2. port: 8011
    3. eureka:
    4. client:
    5. service-url:
    6. defaultZone: http://localhost:7001/eureka
    7. fetch-registry: true
    8. instance:
    9. instance-id: provider-stream-${server.port}
    10. prefer-ip-address: true
    11. spring:
    12. application:
    13. name: provider-stream
    14. #消息中间件配置
    15. rabbitmq:
    16. host: 127.0.0.1
    17. port: 5672
    18. username: spring
    19. password: 123456
    20. cloud:
    21. stream:
    22. bindings:
    23. output-out-0: #通道名称
    24. destination: message #exchange

    1.4、主应用类配置  

            在项目src/main/java下创建主应用类ProviderStreamApplication.java,添加注解@EnableEurekaClient、@SpringBootApplication。

    1. @EnableEurekaClient
    2. @SpringBootApplication
    3. public class ProviderStreamApplication {
    4. public static void main(String[] args) {
    5. SpringApplication.run(ProviderStreamApplication.class, args);
    6. }
    7. }

    1.5 service层配置

            创建SendMessageService类实现生产者发送消息功能。streamBridge为消息队列的桥接对象,send方法的第一个参数是通道的名称,第二个参数为发送的数据。

    1. @Service
    2. @Slf4j
    3. public class SendMessageService {
    4. @Resource
    5. private StreamBridge streamBridge;
    6. // 发送消息
    7. public void send(){
    8. String data = UUID.randomUUID().toString();
    9. log.info("发送的消息: {}", data);
    10. streamBridge.send("output-out-0", data);
    11. }
    12. }

    1.6 controller层配置

            创建SendMessageController类实现web的访问,并通过调用SendMessageService的send方法进行消息发送。

    1. @RestController
    2. @Slf4j
    3. public class SendMessageController {
    4. @Resource
    5. private SendMessageService sendMessageService;
    6. @GetMapping("/send")
    7. public void send(){
    8. sendMessageService.send();
    9. }
    10. }

    2、消费者consumer项目配置

    2.1 创建consumer-stream-8012项目

            打开idea新建项目,选择maven,创建springboot项目consumer-stream-8012。

    2.2、pom文件配置

            在项目pom中引入spring-cloud-starter-netflix-eureka-client和spring-cloud-starter-stream-rabbit依赖:

    1. <dependencies>
    2. <dependency>
    3. <groupId>org.springframework.bootgroupId>
    4. <artifactId>spring-boot-starter-webartifactId>
    5. dependency>
    6. <dependency>
    7. <groupId>org.projectlombokgroupId>
    8. <artifactId>lombokartifactId>
    9. dependency>
    10. <dependency>
    11. <groupId>org.springframework.cloudgroupId>
    12. <artifactId>spring-cloud-starter-netflix-eureka-clientartifactId>
    13. dependency>
    14. <dependency>
    15. <groupId>org.springframework.cloudgroupId>
    16. <artifactId>spring-cloud-starter-stream-rabbitartifactId>
    17. dependency>
    18. dependencies>

    2.3、application.yml文件配置

            在项目resources文件夹下创建application.yml文件,并按如下内容进行配置:

    1. server:
    2. port: 8012
    3. eureka:
    4. client:
    5. service-url:
    6. defaultZone: http://localhost:7001/eureka
    7. fetch-registry: true
    8. instance:
    9. instance-id: consumer-stream-${server.port}
    10. prefer-ip-address: true
    11. spring:
    12. application:
    13. name: consumer-stream
    14. #消息中间件配置
    15. rabbitmq:
    16. host: 127.0.0.1
    17. port: 5672
    18. username: spring
    19. password: 123456
    20. cloud:
    21. stream:
    22. bindings:
    23. input-in-0: #通道名称
    24. destination: message #exchange,需要与发送者的配置名称一致才能收到对应消息
    25. group: consumer #分组,同一组的消息只会被一个消费者消费,避免重复消费。

    2.4、主应用类配置  

            在项目src/main/java下创建主应用类 GatewayApplication.java,添加注解@EnableEurekaClient、@SpringBootApplication

    1. @SpringBootApplication
    2. @EnableEurekaClient
    3. public class ConsumerStreamApplication {
    4. public static void main(String[] args) {
    5. SpringApplication.run(ConsumerStreamApplication.class, args);
    6. }
    7. }

    2.5 消费类配置

            在config文件夹下创建配置类StreamConfig,并编写消费类bean实现消息消费。

    1. @Configuration
    2. @Slf4j
    3. public class StreamConfig {
    4. @Bean
    5. public Consumer input() {
    6. return (data) -> log.info("收到的消息:{}", data);
    7. }
    8. }

    2.6 创建consumer-stream-8013项目

            参考consumer-stream-8012项目进行创建即可。

    3、测试验证

            同时启动并运行项目eueka-server-7001、provider-stream-8011、consumer-stream-8012、consumer-stream-8013。通过访问http://127.0.0.1:8011/send发送消息,可以分别在provider-stream-8011、consumer-stream-8012的控制台看到收到的消息日志。


    新时代农民工 

  • 相关阅读:
    WPS通过“文档部件”的“域”设置图、表和公式的自动序列号
    微服务学习第四十四节 Sentinel整合GateWay
    nginx localtion使用正则匹配导致301 循环
    Prometheus 基本概念
    关于vue混入(mixin)的解读
    FreeRTOS 基于 ARMv8-M 对 MPU 的应用
    深度学习基础--神经网络(4)参数更新策略,梯度法
    深度学习-房价预测案例
    其他重要协议(DNS,ICMP,NAT,交换机)
    数据结构——二叉树1
  • 原文地址:https://blog.csdn.net/sg_knight/article/details/126360668