• Spring Cloud Stream 消息驱动微服务 相关图形,dljd,cat - spring cloud alibaba,p153完成


    一 业务

    1. 多个微服务

    二 需求

    1. 多个微服务之间需要通过消息中间件互相传递消息。
    2. 在面对不同的MQ产品(RabbitMQ、Kafka、RocketMQ)时,做到消息中间件和微服务的代码之间耦合性降到最低,甚至任意切换消息中间件而不用动微服务的代码。

    三 解决方案

    1 功能“集”:降低微服务和消息中间件的耦合性

    2 “神”冰箱:spring cloud stream整合消息中间件(RabbitMQ、Kafka、RocketMQ)

    四 完成学习

    1 思想、思路

    1. 按照官方的定义,Spring Cloud Stream 是一个构建消息驱动微服务的框架
      1.  ​​​​​​Spring Cloud Stream解决了开发人员无感知的使用消息中间件的问题。Spring Cloud Stream对消息中间件的进一步封装,可以做到代码层面对消息中间件的无感知,甚至于动态的切换中间件(rabbitmq切换为rocketmq或者kafka),使得微服务开发的高度解耦,服务可以关注更多自己的业务流程;
        1. String cloud stream对各个MQ产品(RabbitMQ、Kafka、RocketMQ)的连接和各种操作,做了一层高度的抽象。在String cloud stream的加持下,我们可任意选用某一种消息中间件而不用修改微服务的代码。
        2. 相当于ORM框架(hibernate、mybatis),对底层的多种数据库产品的连接和各种操作做了一层高度的抽象。在ORM框架的加持下,我们可任意选用某一种数据库而不用修改代码。   
    2. Spring Cloud Stream官方只支持rabbitmq 和 kafka,spring cloud alibaba新写了一个starter可以使Spring Cloud Stream支持RocketMQ;   

    2 体系组织

    1. spring cloud stream是spring cloud提供的一个子项目(或称   框架)。
    2. application core是我们写的微服务的代码,可以有多个application core,即有多个微服务。
      1. output:微服务发消息到消息中间件
      2. input:微服务接收消息中间件的消息。
    3. binder:绑定器
      1. spring cloud stream子项目(框架)给我们封装的东西叫binder。
      2. spring cloud stream封装的binder对微服务和消息中间件之间发消息和接消息的操作做了高度的抽象、封装。
      3. binder用于创建binding,从而绑定具体的消息中间件。
      4. 消息中间件都有自己的Binder实现;比如Kafka 的实现KafkaMessageChannelBinder,RabbitMQ的实现RabbitMessageChannelBinder以及RocketMQ 的实现 RocketMQMessageChannelBinder;

    4. binding
      1. 包括 Input Binding 和 Output Binding;

      2. Binding 在消息中间件与应用程序提供的 Provider 和 Consumer 之间提供了一个桥梁,实现了开发者只需使用应用程序的 Provider 或 Consumer 生产或消费数据即可,屏蔽了开发者与底层消息中间件的接触;

    5. 常用的消息中间件:kafka、rabbimq、rokectmq
    6. 开发过程中使用到的类、注解:

      组成

      说明

      Binder 绑定器(各个消息中间件的绑定器不同)

      Binder是应用与消息中间件之间的封装,目前实现了Kafka和RabbitMQ的Binder,通过Binder可以很方便的连接中间件,可以动态的改变消息类型(对应于Kafka的topic,RabbitMQ的exchange),这些都可以通过配置文件来实现;

      @Input 注解,表明是一个输入管道

      该注解标识输入通道,通过该输入通道接收消息进入应用程序

      @Output 注解,表明是一个输出管道

      该注解标识输出通道,发布的消息将通过该通道离开应用程序

      @StreamListener 注解

      监听队列,用于消费者的队列的消息接收

      @EnableBinding 注解,开启(Enable)绑定功能

      将信道channel和exchange、topic绑定在一起

    3 工作流程

    1. 第一步:Pom.xml引入依赖
    2. 第二步:application.properties配置
      1. ➌Binder
        1. RocketMQ服务器地址
      2. ➍Bindings
        1. input的3要素键值对:String:BindingProperties对象的属性(这里的string是input)
        2. output的3要素键值对:String:BindingProperties对象的属性(这里的string是output)
    3. 第三步:编码
      1. ➊Source
        1. ➋输出信道,发送消费
      2. ➊Sink
        1. ➋输入信道,消费消息
    4. 第四步:验证

    4 源码分析:弊端

    1. 组id是com.alibaba.cloud
      1. pom.xml中依赖是:spring-cloud-starter-rocketmq

        1. 对应的jar包是:com.alibaba.cloud.spring-cloud-starter-stream-rocketmq.jar
          1. 自动装配解析:META-INF/spring.factories
            org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
            com.alibaba.cloud.stream.binder.rocketmq.config.RocketMQComponent4BinderAutoConfiguration
            1. 自动装配的类:RocketMQComponent4BinderAutoConfiguration.java
              1. RocketMQ的自动装配:@AutoConfigureAfter({RocketMQAutoConfiguration.class})
                1. RocketMQ属性的配置:@EnableConfigurationProperties({RocketMQProperties.class})
                  1. 命名服务的地址:nameServer
                  2. 生产者:producer
          2. 源码解析:

            1. Binder属性配置:RocketMQBinderConfigurationProperties.java
              applicatoin.properties中

              # 客户端接入点,必填
              spring.cloud.stream.rocketmq.binder.name-server=192.168.172.128:9876
              对应的源码,如下图所示

            2. Bindings属性配置:RocketMQBindingProperties.java
              1. 生产者Bindings属性配置
              2. 消费者Bindings属性配置
            3. 消费者属性配置:RocketMQConsumerProperties.java
              1. 标签属性配置
              2. sql属性配置
              3. 集群方式:广播模式 或 集群模式
            4. 生产者属性配置:RocketMQProducerProperties.java
              1. 组属性配置
              2. 事务消息属性配置
              3. 同步异步属性配置
            5. RocketMQExtendedBindingProperties
    2. 分析总结:
      1. 配置项目太少,所以在spring cloud stream加持下RocketMQ的一些功能是实现不了的。即使用spring cloud stream + RocketMQ比单独使用RocketMQ时,功能要少一些,只提供了核心的常用的功能配置属性。
      2. 以等alibaba升级了版本后,直接去看生产者或消费者的属性配置类(RocketMQProducerProperties.java、RocketMQConsumerProperties.java)多加了哪些属性,多加了属性就表明相应地会增加一些对应的功能。

    五 配置应用

    1 stream + rocketmq的HelloWorld程序

    1. 第一步:新建springboot项目、pom.xml
      1. 第1步:引入spring cloud stream的起步依赖: 

        <dependency>
            <groupId>com.alibaba.cloudgroupId>
            <artifactId>spring-cloud-starter-stream-rocketmqartifactId>
        dependency>

      2. 第2步:依赖父项目spring cloud alibaba,解决了spring cloud stream的起步依赖没有版本号从而不能引入的问题:
        <spring-cloud-alibaba.version>2.2.1.RELEASEspring-cloud-alibaba.version>

      3. 注意:兼容性问题:

        注意版本需要使用springboot2.2.5

        <spring-boot.version>2.2.5.RELEASEspring-boot.version>
        <spring-cloud-alibaba.version>2.2.1.RELEASEspring-cloud-alibaba.version>

      4. 第3步:依赖spring-cloud-starter-web,有

    2. 第二步:application.properties配置文件:组一样竞争关系

      ########## RocketMQ 通用配置
      # 配置绑定器binder连接某个消息中间件(由ip和port指定),高度抽象,用于屏蔽不同消息中间件之间的差异(相当于ORM框架中的hibernate和mybatis)
      spring.cloud.stream.rocketmq.binder.name-server=192.168.172.128:9876

      # 日志级别
      logging.level.com.alibaba.cloud.stream.binder.rocketmq=INFO

      ########## Consumer Config###   bindings是微服务与具体消息中间件沟通(发消息,收消息)的桥梁   #######
      # input 的配置:
      spring.cloud.stream.bindings.input.destination=test-topic
      spring.cloud.stream.bindings.input.content-type=text/plain
      spring.cloud.stream.bindings.input.group=test-group

      ########## Produce Config
      # output 的配置如下:
      spring.cloud.stream.bindings.output.destination=test-topic
      spring.cloud.stream.bindings.output.content-type=text/plain

      spring.cloud.stream.bindings.output.group=test-group

    3. 第三步:编码

      1. 第1步:SendService.java

        1. 发送消息:

          @EnableBinding(Source.class)
          @Service
          public class SenderService {

              //Source是spring cloud stream封装的 
              @Autowired
              private Source source;
              //发送消息的方法
              public void send(String msg) throws Exception {

          //获取订阅通(信)道:source.output()==MessageChannel

          //发送消息MessageChannel.send()的参数是:Message.java

          //                                                         返回值:布尔类型
                  boolean flag = source.output().send(MessageBuilder.withPayload(msg).build());
                  System.out.println("消息发送:" + flag);
              }
          }

      2. 第2步:Application.java发送消息的方式(或使用XxxController.java)

      3. ReceiveService.java接收消

        1. 方案1:手动调用方法去接收(while true死循环):Sink.input获取订阅信道

          //获取消息通(信)道:source.input()==SubscribableChannel

          //发送消息SubscribableChannel.subscrible()的参数是:Message.java

        2. 方案2:自动监听方式@EnableBinding(Sink.class) + @StreamListener("input")

          @EnableBinding(Sink.class)
          public class ReceiveService {

              @StreamListener("input")
              public void receiveInput1(String receiveMsg) {
                  System.out.println("input 接收到的消息: " + receiveMsg);
              }
          }

    4. 第四步:验证

      1. MQ管控台 

        1. 目的地有消息:spring.cloud.stream.bindings.input.destination=test-topic

        2. 消息被消费

        3. 控制台打印

           

    2 自定义信道:input1 + output1 + 扩展多个inputX多个outputX

       在前面的案例中,我们已经实现了一个基础的 Spring Cloud Stream 消息传递处理操作,但在操作之中使用的是系统提供的 Source (output)、Sink(input),接下来我们来看一下自定义信道名称;

    1. 第一步:pom.xml
    2. 第二步:application.properties::组一样竞争关系
      1. ########## 自定义
        # input 的配置:
        spring.cloud.stream.bindings.input1.destination=test-topic1
        spring.cloud.stream.bindings.input1.content-type=text/plain
        spring.cloud.stream.bindings.input1.group=test-group1

        # output 的配置:
        spring.cloud.stream.bindings.output1.destination=test-topic1
        spring.cloud.stream.bindings.output1.content-type=text/plain

      2. spring.cloud.stream.bindings.input1.group=test-group1

    3. 第三步:编码

      1. 第1步:自定义Source接口

        public interface MySource {

        String OUTPUT1 = "output1";


            @Output(MySource.OUTPUT1)

            MessageChannel output1();
        }

      2. 第2步:发送消息业务层,SendService.java

      3. 第3步:自定义Sink接口

        public interface MySink {

        String INPUT1 = "input1";

            @Input(MySink.INPUT1)

            SubscribableChannel input1();

        }

      4. 第4步:接收消息业务层,ReceiveService.java

      5. 第5步:applicaton.java 或 XxxController.java中发送、消费消息

        1. 发送消息

        2. 接收消息

          1. 传统方式:手动调用receive方法(while死循环)

          2. 自动监听方式:@EnableBinding(Sink.class) + @StreamListener("input")

    4. 第四步:验证

    5. 第五步:扩展

      1. 第1步:多个发送

      2. 第2步:多个消费

    3 Stream + RocketMQ事务消息

    (1)流原

    1. 第一步:生产者MQ Producer发送一条“半”消息Send Half Msg到消息中间件服务器MQ Server,即并不提交。
      1. 注:此时这条“半”消息对于消费者MQ Subscriber来说是不能消费、甚至说是不可见的
    2. 第二步:消息中间件服务器MQ Server返回给生产者MQ Producer一个响应,如200 OK。
    3. 第三步:生产者MQ Producer执行本地的事务Local Transaction
      1. 注:本地事务指:你做业务逻辑处理,比如说你要插入订单、比如说你要修改订单状态。
    4. 第四步:如果本地的事务Local Transaction提交成功,返回提交Commit,否则返回回滚Rollback,给到消息服务器MQ Server。消息服务器MQ Server接收到提交Commit后,把“半”消息Send Half Msg变成了“可用的”消息。如果消息服务器MQ Server接收到回滚Rollback,那么消息服务器MQ Server就会把这条消息删除掉Rollback Delete Msg。
      1. 注:此时这条“可用消息”就可以让消费者MQ Subscriber可见、可消费了。
    5. 第五步:如果本地的事务Local Transaction一直没有给消息服务器MQ Server发送状态(提交或回滚状态),即超时。那么消息服务器MQ Server会回调(5 Check back)生产者MQ Producer的方法。
    6. 第六步:生产者MQ Producer的回调(5 Check back)方法中,要写代码去检查本地事务(6 check the state of local Transaction)。检查什么呢?即检查本地事务插入订单操作有没有成功,我们可以通过查询数据库来得到结果。
    7. 第七步:如果检查的结果是插入成功,可以返回提交Commit,否则返回回滚Rollback,给到消息服务器MQ Server。

    (2)配置应用:Stream + RocketMQ事务消息

    1. 第一步:创建新的springboot项目
    2. 第二步:pom.xml加依赖::spring-cloud-starter-rocketmq
    3. 第三步:application.properties

      #-------------------------- 事务消息 begin --------------------------------
      #生产的配置
      spring.cloud.stream.bindings.outputTX.destination=TransactionTopic
      spring.cloud.stream.bindings.outputTX.content-type=application/json
      spring.cloud.stream.rocketmq.bindings.outputTX.producer.group=myTxProducerGroup
      #是否为事务消息,默认为false表示不是事务消息,true表示是事务消息
      spring.cloud.stream.rocketmq.bindings.outputTX.producer.transactional=true

      #消费的配置:
      spring.cloud.stream.bindings.inputTX.destination=TransactionTopic
      spring.cloud.stream.bindings.inputTX.content-type=text/plain
      spring.cloud.stream.bindings.inputTX.group=transaction-group
      spring.cloud.stream.rocketmq.bindings.inputTX.consumer.broadcasting=false

      1#-------------------------- 事务消息 end --------------------------------

    4. 第四步:编码
      1. 第1步:生产者
        1. a.自定义信道

          public interface MySource {

                       String OUTPUTTx= "outpuTx";


                       @Output(MySource.OUTPUTTX)

                       MessageChannel outputTX();
          }

        2. b.发送事务消息,SenderService.java
          1. @Autowired
          2. private MySource mySource;
          3. public void sendTransactionalMsg(T msg, int num) throws Exception {
          4. // 需要构建的消息对象MessageBuilder
          5. MessageBuilder builder = MessageBuilder.withPayload(msg)
          6. .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON);
          7. builder.setHeader("test", String.valueOf(num));
          8. Message message = builder.build();
          9. boolean flag = mySource.outputTX().send(message);
          10. System.out.println("inputTX 事务消息发送:" + flag);
          11. }
        3. c.执行本地事务和本地事务检查
          1. // 执行本地事务和本地事务检查:
          2. // 监听器
          3. @RocketMQTransactionListener(txProducerGroup = "myTxProducerGroup", corePoolSize = 5, maximumPoolSize = 10)
          4. public class TransactionListenerImpl implements RocketMQLocalTransactionListener {
          5. @Override
          6. public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
          7. Object num = msg.getHeaders().get("test");
          8. if ("1".equals(num)) {
          9. System.out.println("executer: " + new String((byte[]) msg.getPayload()) + " unknown");
          10. // 未知,需要二次检查
          11. return RocketMQLocalTransactionState.UNKNOWN;
          12. } else if ("2".equals(num)) {
          13. System.out.println("executer: " + new String((byte[]) msg.getPayload()) + " rollback");
          14. return RocketMQLocalTransactionState.ROLLBACK;
          15. }
          16. System.out.println("executer: " + new String((byte[]) msg.getPayload()) + " commit");
          17. return RocketMQLocalTransactionState.COMMIT;
          18. }
          19. // 回调检查
          20. @Override
          21. public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
          22. System.out.println("check: " + new String((byte[]) msg.getPayload()));
          23. // 假设回调检查没有异常,则返回commit
          24. return RocketMQLocalTransactionState.COMMIT;
          25. }
          26. }
        4. d.程序入口类application.java

      2. 第2步:消费者

        1. a.MySink.java

        2. b.ReceiveService.java

          1. 监听器自动接收消费

          2. .output 或 .input手动调用消息

      3. 第3步:程序入口类

    5. 第五步:测试验证

      1. 生产者

        1. ​​​​​​​返回commit,事务正常提交,消费者可见、也可消费消息

        2. 返回rallback,事务回滚,消息删除

        3. 超时,运行二次检查方法

      2. ​​​​​​​消费者

        1. ​​​​​​​成功接收到所有的消息

    4 纲:生产者和消费者按标签tag发送或消费消息 

    六 开发的时候有两种选择

    1. 一种就是 直接SpringBoot + RocketMQ整合实现消息传送;
    2. 一种就是 使用Spring Cloud Stream对消息中间件的包装,来实现消息传送;
  • 相关阅读:
    精准用户画像!商城用户分群2.0!
    【4 进程与线程】
    配置你的VsCode服务器,随时随地写代码!
    ABP VNext添加全局认证(如何继承AuthorizeFilter)
    Matlab/simulink双馈异步风力发电机滑膜控制建模仿真
    多边形碰撞检测算法
    玩转外贸LinkedIn必备的三大特质,以及突破六度人脉技巧
    C语言之for while语句详解
    离线量化(后量化)算法研究-----脉络梳理
    uboot启动流程-uboot内存分配工作总结
  • 原文地址:https://blog.csdn.net/aiwokache/article/details/126784977