• Spring Cloud Stream的配置及使用——以RabbitMQ为例


    1. 简介

    https://docs.spring.io/spring-cloud-stream-binder-rabbit/docs/current/reference/html/spring-cloud-stream-binder-rabbit.html
    英语好的可以直接看官方文档,文档里讲的更全面
    RabbitMQ Binder
    By default, the RabbitMQ Binder implementation maps each destination to a TopicExchange. For each consumer group, a Queue is bound to that TopicExchange.

    上图是RabbitMQ Binder(绑定器)。默认情况下,绑定器实现将每一个destination映射到一个TopicExchange。对于每一个消费者组,都有一个队列绑定到那个TopicExchange。

    2. 依赖配置

    
        org.springframework.cloud
        spring-cloud-dependencies
        ${spring-cloud.version}
        pom
        import
    
    
        org.springframework.cloud
        spring-cloud-starter-stream-rabbit
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    3. 生产者配置及消息发送

    3.1 yaml配置

    spring:
      cloud:
        stream:
          #      如果有一个binder的话,就不需要设置
          default-binder: rabbit
          binders:
            rabbit1:
              type: rabbit
              environment:
                spring:
                  rabbitmq:
                    host: 192.168.70.224
                    port: 5672
                    username: admin
                    password: 444944
                    virtual-host: GHost
            rabbit:
              type: rabbit
              defaultCandidate: false
              environment:
                spring:
                  rabbitmq:
                    host: 192.168.70.167
                    port: 5672
                    username: admin
                    password: public
                    virtual-host: /
          bindings:
            order:
              binder: rabbit
              destination: order
              producer:
                #            默认是true
                autoStartup: true
            cart:
              binder: rabbit
              destination: cart
              routingKeyExpression: han
              producer:
                #            默认是true
                autoStartup: true
    # rabbitmq发送的消息默认routingKey为destination, 如果是分区的destination, 默认值为destination-
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    配置说明:

    • 在Spring Cloud Stream中可以配置多个binder,也就是可以配置连接多个MQ服务器
    • 在RabbitMQ中,binding的名称对应的是output和input的名称,destination对应mq中的exchange名称。上述配置会生成order,cart两个exchange
    • rabbitmq发送的消息默认routingKey为destination, 如果是分区的destination, 默认值为destination-,分区内容这里不涉及,具体内容看官方文档

    3.2 生产者声明

    public interface CartSource {
        /**
         * Name of the output channel.
         * cart对应的是binding,
         * destination对应的是rabbitmq里的exchange,kafka中的topic.
         * 如果没有设置destination, rabbitmq会自动创建一个和binding同名的exchange
         */
        String OUTPUT = "cart";
    
        /**
         * @return output channel
         */
        @Output(CartSource.OUTPUT)
        MessageChannel output();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    cloud stream中output表示发送,这里的cart与配置中的binding名称是对应的

    //必须添加,要不然无法注入。也可以加在启动类上,可以重复添加
    @EnableBinding(CartSource.class)
    @Component
    public class CartSender {
    
        @Autowired
        private CartSource orderSource;
    
        private static final Logger logger= LoggerFactory.getLogger(CartSender.class);
    
        public void pushMsg(Order order){
            logger.info("sending rabbitmq message:{}",order.toString());
            orderSource.output().send(MessageBuilder.withPayload(order).build());
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    3.3 消息发送

    @RestController
    @RequestMapping("mqTest1")
    public class MqTest1Controller {
        @Autowired
        CartSender cartSender;
        @GetMapping("streamPush")
        public String streamPush(){
            cartSender.pushMsg(new Order());
            return "hehe";
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    4. 消费者配置

    4.1 yaml配置

    spring:
      cloud:
        stream:
          #      如果有一个binder的话,就不需要设置
          default-binder: rabbit
          binders:
            rabbit1:
              type: rabbit
              environment:
                spring:
                  rabbitmq:
                    host: 192.168.70.224
                    port: 5672
                    username: admin
                    password: 444944
                    virtual-host: /
            rabbit:
              type: rabbit
              defaultCandidate: false
              environment:
                spring:
                  rabbitmq:
                    host: 192.168.70.167
                    port: 5672
                    username: admin
                    password: public
                    virtual-host: /
          bindings:
            order:
              binder: rabbit
              destination: order
              group: myOrderQueue
              conumer:
                concurrency: 3
            cart:
              binder: rabbit
              destination: cart
              group: myCartQueue
              conumer:
                concurrency: 3
          # rabbit的扩展配置 RabbitExtendedBindingProperties
          rabbit:
            bindings:
    #          order:
    #            consumer:
    #              bindingRoutingKey: order-key
              cart:
                consumer:
                  #          如果没有指定routing key,会使用默认的 #
                  bindingRoutingKey: cart-key
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50

    配置说明:

    • 只有消费者才会创建队列,queue名称为:.
    • routingKey设置
      spring.cloud.stream.rabbit.bindings..consumer.bindingRoutingKey=myRoutingKey
      如果没有设置的话,默认为#

    在这里插入图片描述

    4.2 消费者声明与消息接收

    public interface CartSink {
    
        String INPUT = "cart";
    
        /**
         * @return input channel.
         */
        @Input(CartSink.INPUT)
        SubscribableChannel input();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    cloud stream中input表示接收,这里的cart与配置中的binding名称是对应的

    @EnableBinding(CartSink.class)
    public class CartHandler {
        private static final Logger logger = LoggerFactory.getLogger(CartHandler.class);
        /**
         * 参数也可以是对象,会自动将消息转换为对象
         * @param headers
         * @param payload
         */
        @StreamListener(CartSink.INPUT)
        public void loggerSink(@Headers MessageHeaders headers, byte[] payload){
            String cartChange=new String(payload);
            logger.info("cart change:{}",cartChange);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
  • 相关阅读:
    debug设置
    创造财富的9种模式
    【论文阅读】【yolo系列】YOLO-Pose的论文阅读
    【css】背景换颜色
    bash -s 的作用
    Netty(一)NIO-基础
    从0写bootloader — Bootloader重定位APP
    架构师的 36 项修炼第07讲:高性能系统架构设计
    tkmybatis通用mapper实现在使用Example进行查询的几种方式
    基于SSM的IT运维管理系统
  • 原文地址:https://blog.csdn.net/m0_67391518/article/details/126496911