• Spring Cloud Stream 消息驱动基础入门与实践总结


    Spring Cloud Stream是用于构建与共享消息传递系统连接的高度可伸缩的事件驱动微服务框架,该框架提供了一个灵活的编程模型,它建立在已经建立和熟悉的Spring熟语和最佳实践上,包括支持持久化的发布/订阅、消费组以及消息分区这三个核心概念。

    【1】概念介绍

    ① 什么是Spring Cloud Stream

    官方定义 Spring Cloud Stream 是一个构建消息驱动微服务的框架。为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。简单来讲,就是屏蔽了底层XXMQ,应用层不用关注底层是RabbitMQ还是Kafka 。类似于Spring Data抽离持久层屏蔽底层各种数据库的概念。

    应用程序通过 inputs 或者 outputs 来与 Spring Cloud Stream中binder对象交互。通过我们配置来binding(绑定) ,而 Spring Cloud Stream 的 binder对象负责与消息中间件交互。所以,我们只需要搞清楚如何与 Spring Cloud Stream 交互就可以方便使用消息驱动的方式。

    通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动

    官网文档:https://spring.io/projects/spring-cloud-stream#overview

    在这里插入图片描述

    ② stream如何统一底层差异

    在没有绑定器这个概念的情况下,我们的SpringBoot应用要直接与消息中间件进行信息交互的时候,由于各消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性。

    Binder可以生成Binding,Binding用来绑定消息容器的生产者和消费者,它有两种类型,INPUT和OUTPUT,INPUT对应于消费者,OUTPUT对应于生产者。

    通过定义绑定器作为中间层,完美地实现了应用程序与消息中间件细节之间的隔离。Stream对消息中间件的进一步封装,可以做到代码层面对中间件的无感知,甚至于动态的切换中间件(rabbitmq切换为kafka),使得微服务开发的高度解耦,服务可以关注更多自己的业务流程。

    • 通过定义绑定器作为中间层,完美地实现了应用程序与消息中间件细节之间的隔离。

    • 通过向应用程序暴露统一的Channel通道,使得应用程序不需要再考虑各种不同的消息中间件实现。

    • 通过定义绑定器Binder作为中间层,实现了应用程序与消息中间件细节之间的隔离。

    在这里插入图片描述

    ③ Spring Cloud Stream标准流程设计

    Stream中的消息通信方式遵循了发布-订阅模式。

    • Binder:很方便的连接中间件,屏蔽差异
    • Channel:通道,是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过Channel对队列进行配置
    • Source和Sink:简单的可理解为参照对象是Spring Cloud Stream自身,从Stream发布消息就是输出,接受消息就是输入。

    在这里插入图片描述

    ④ 几个API注解

    @EnableBinding:指信道channel和exchange绑定在一起。

    @StreamListener:监听队列,用于消费者的队列的消息接收

    @Input:注解标识输入通道,通过该输入通道接收到的消息进入应用程序。

    @Output:注解标识输出通道,发布的消息将通过该通道离开应用程序

    下面以RabbitMQ为底层MQ来说明如何使用Stream,当然同样要先安装好RabbitMQ。

    【2】消息生产者

    ① pom依赖

    <dependency>
         <groupId>org.springframework.cloudgroupId>
         <artifactId>spring-cloud-starter-stream-rabbitartifactId>
     dependency>
    

    ② yml配置

    server:
      port: 8801
    
    spring:
      application:
        name: cloud-stream-provider
      cloud:
        stream:
          binders: #在此配置要绑定的 rabbitmq的服务信息
            defaultRabbit:  # 表示定义的名称,用于 binding整合
              type: rabbit  # 消息组件类型
              environment:  # 设置rabbitmq相关的环境配置
                spring:
                  rabbitmq:
                    host: localhost
                    port: 5672
                    username: guest
                    password: guest
          bindings: # 服务的整合处理
            output: # 输出通道的名称
              destination: studyExchange  #表示要使用的 Exchange 名称定义
              content-type: application/json  # 消息类型
              binder: defaultRabbit
    eureka:
      client:
        service-url:
          defaultZone: http://localhost:7001/eureka
      instance:
        lease-renewal-interval-in-seconds: 2 # 设置心跳时间间隔默认30s
        lease-expiration-duration-in-seconds: 5 # 如果超过了5秒的间隔默认90s
        instance-id: send-8001.com  #信息列表显示主机名称
        prefer-ip-address: true # 访问路径变为ip地址
    
    

    ③ 消息服务类

    public interface IMessageProvider {
        public String send();
    }
    
    @EnableBinding(Source.class)//定义消息推送管道
    @Slf4j
    public class IMessageProviderImpl implements IMessageProvider {
    
        @Resource
        private MessageChannel output;//消息发送通道
    
        @Override
        public String send() {
            String serial = UUID.randomUUID().toString();
            output.send(MessageBuilder.withPayload(serial).build());
            log.info(serial+"***********************");
            return serial;
        }
    }
    

    编写控制器发送消息:

    @RestController
    public class IMessageController {
        @Resource
        private IMessageProvider provider;
    
        @GetMapping("/sendMessage")
        public String send(){
            return provider.send();
        }
    }
    
    

    【3】消息消费者

    ① pom依赖

    <dependency>
         <groupId>org.springframework.cloudgroupId>
         <artifactId>spring-cloud-starter-stream-rabbitartifactId>
     dependency>
    

    ② yml配置

    server:
      port: 8802
    
    spring:
      application:
        name: cloud-stream-consumer
      cloud:
        stream:
          binders: #在此配置要绑定的 rabbitmq的服务信息
            defaultRabbit:  # 表示定义的名称,用于 binding整合
              type: rabbit  # 消息组件类型
              environment:  # 设置rabbitmq相关的环境配置
                spring:
                  rabbitmq:
                    host: localhost
                    port: 5672
                    username: guest
                    password: guest
          bindings: # 服务的整合处理
            input: # 输出通道的名称
              destination: studyExchange  #表示要使用的 Exchange 名称定义
              content-type: application/json  # 消息类型
              binder: defaultRabbit
              group: group1
    eureka:
      client:
        service-url:
          defaultZone: http://localhost:7001/eureka
      instance:
        lease-renewal-interval-in-seconds: 2 # 设置心跳时间间隔默认30s
        lease-expiration-duration-in-seconds: 5 # 如果超过了5秒的间隔默认90s
        instance-id: receive-8002.com  #信息列表显示主机名称
        prefer-ip-address: true # 访问路径变为ip地址
    

    ③ 消息接收服务

    @Component
    @Slf4j
    @EnableBinding(Sink.class)
    public class StreamController {
        @Value("${server.port}")
        private String serverPort;
    
        @StreamListener(Sink.INPUT)
        public void input(Message<String>message){
            log.info("消费者1号接收到消息"+message.getPayload()+"\t port:"+serverPort);
        }
    
    }
    

    上面【2】【3】即可实现消息的发送和接收,但是假设有多个消费者,我们还要考虑两个问题:消息的重复消费和消息的持久化。

    【4】group分组解决重复消费问题

    比如在如下场景中,订单系统我们做集群部署,都会从RabbitMQ中获取订单信息,那如果一个订单同时被两个服务获取到,那么就会造成数据错误,我们得避免这种情况。这时我们就可以使用Stream中的消息分组来解决。

    在Stream中处于同一个group中的多个消费者是竞争关系,就能够保证消息只会被其中一个应用消费一次。

    • 不同组是可以全面消费的(重复消费),
    • 同一组内会发生竞争关系,只有其中一个可以消费。

    也就是说,两个消费者微服务的group定义为同一个,即可以解决重复消费问题

    【5】group分组解决消息丢失问题

    即,在你服务停机重启期间,消息在不断发送,而服务启动后并没有接收到发送的消息。这是由于你没有配置group属性导致的。

    解决方案:配置group属性

    spring:
      application:
        name: cloud-stream-consumer
      cloud:
        stream:
          binders: #在此配置要绑定的 rabbitmq的服务信息
            defaultRabbit:  # 表示定义的名称,用于 binding整合
              type: rabbit  # 消息组件类型
              environment:  # 设置rabbitmq相关的环境配置
                spring:
                  rabbitmq:
                    host: localhost
                    port: 5672
                    username: guest
                    password: guest
          bindings: # 服务的整合处理
            input: # 输出通道的名称
              destination: studyExchange  #表示要使用的 Exchange 名称定义
              content-type: application/json  # 消息类型
              binder: defaultRabbit
              group: group1 # 这个很重要!!!
    
  • 相关阅读:
    HBase学习笔记(3)—— HBase整合Phoenix
    SpringCloud-Config&Bus
    ROS1学习笔记:ROS命令行工具的使用(ubuntu20.04)
    【洛谷】P3368 【模板】树状数组 2 (单点修改+区间查询或区间修改+单点查询的作用)
    一种4g扫码付费通电控制器方案
    【博客474】为什么k8s控制面pod使用的ip是node ip,而非pod cidr中的ip
    FPGA系统性学习笔记连载_Day4 Xilinx ZYNQ7000系列 PS、PL、AXI 、启动流程基本概念篇
    docker-compose手册
    SSM SpringBoot vue智能手机参数分析平台
    【juc】future并行执行并获取返回值
  • 原文地址:https://blog.csdn.net/J080624/article/details/139644605