• 【Spring Boot 集成应用】RocketMQ的集成用法(下)


    1. RocketMQ集成之异步发送

    异步发送能够提升发送效率, 适合高并发场景下使用, 基于RocketMQ集成之普通消息发送做改造:

    1. 增加异步发送接口

      com.mirson.spring.boot.mq.rocket.basic.provider.RocketMqProviderContorller

          /**
           * 异步发送消息
           * @return
           */
          @GetMapping("/asyncSendString")
          public String asyncSendString() {
      
              for(int i=0; i<10; i++) {
      
                  String msg = "seq number: " + i;
                  final String seq = String.valueOf(i);
                  // 异步方式发送
                  rocketMQTemplate.asyncSend(RabbitMqConfig.TOPIC, msg, new SendCallback() {
                      public void onSuccess(SendResult sendResult) {
                          // 发送成功回调处理
                          log.info("seq number:  " + seq + ", send result: " + sendResult.getSendStatus());
                      }
      
                      public void onException(Throwable e) {
                          // 发送异常回调处理
                          log.error(e.getMessage(), e);
                      }
                  });
              }
      
              return "async send success";
          }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 增加asyncSendString异步发送接口, 连续发送十条有序消息, 调用rocketMQTemplate的asyncSend方法, 实现异步发送, SendCallback内部提供了onSuccess与onException两个回调方法,可以针对性的做相应业务处理。
      • 同步方式发送, 能保证消息有序传递, 这里采用异步发送,不能保证消息能够有序接收, 在实际使用中, 要结合具体的业务场景使用。
    2. 测试验证

      • 调用异步发送接口
        在这里插入图片描述

        异步方式发送十条消息。

      • 监听器日志

        在这里插入图片描述

        十条消息全部接收成功, 注意消息的发送顺序, 与订阅的接收顺序, 没有保持一致;异步方式能够提升发送效率, 但缺点是不能保障消息的有序消费,在实际使用中, 可以结合同步锁来使用, 比如可以根据账户ID加锁, 因每个账户数据具有独立性, 这样可以提升消息的传递发送效率, 又能保障每个账户接收到的数据是有序的。

    2. RocketMQ集成之ACL权限控制

    ACL是Access Control List简称, 意为访问控制列表, 是RocketMQ4.4新加入的功能。加入ACL能够通过权限管理控制消息队列, 针对不同角色用户分配不同的队列操作权限, 便于权限管控, 提升消息队列数据的安全性。

    1. ACL基本处理流程
      在这里插入图片描述

    2. 创建rocketmq-acl工程
      在这里插入图片描述

    3. 工程配置

      application.yml

      server:
        port: 12615
      spring:
        application:
          name: rocketmq-acl
      
      # RocketMQ配置
      rocketmq:
        name-server: 10.10.20.15:9876
        # 生产者配置
        producer:
          group: basic-group
          # 权限信息
          access-key: rocketmq2
          secret-key: 12345678
        # 消费者配置
        consumer:
          # 权限信息
          accessKey: rocketmq2
          secret-key: 12345678
      
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21

      注意producer与consumer, 都需配置权限信息, accessKey相当于用户名, secret-key相当于密码。

      这些信息不是随便填写, 要与RocketMQ服务配置文件保持一致。

    4. 定义消息监听器

      com.mirson.spring.boot.mq.rocket.acl.consume.StrSpringMessageConsumer

      @Service
      @RocketMQMessageListener(
              topic = RabbitMqConfig.TOPIC_SPRING_MESSAGE,
              consumerGroup = RabbitMqConfig.CONSUME_GROUP_SPRING_MESSAGE
      //        accessKey = "RocketMQ", // 不需再填写, 会自动从配置文件中读取
      //        secretKey = "12345678" // 不需再填写, 会自动从配置文件中读取
      )
      @Log4j2
      public class StrSpringMessageConsumer implements RocketMQListener<String> {
      
          @Override
          public void onMessage(String str) {
              log.info("StrSpringMessageConsumer => receive str: " + str);
          }
      
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16

      通过RocketMQMessageListener注解, 也可以配置accessKey与secretKey信息, 但在工程配置文件中我们已经填好, 系统启动会自动读取, 可以不用再填写。

    5. 定义发送接口

      com.mirson.spring.boot.mq.rocket.acl.provider.RocketMqProviderContorller

      /**
           * 发送RocketMQ Spring Message封装消息
           * @return
           */
          @GetMapping("/sendSpringMessage")
          public String sendSpringMessage() {
              String msg = "random number: " + RandomUtils.nextInt(0, 100);
              // Send Spring Message With String
              SendResult result = rocketMQTemplate.syncSend(RabbitMqConfig.TOPIC_SPRING_MESSAGE , MessageBuilder.withPayload(msg).build());
              log.info("send result: " + result.getSendStatus());
              return msg;
          }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12

      发送一个Spring Message封装的消息, 调用rocketMQTemplate的syncSend方法发送数据, 无需加额外参数。

    6. RabiitMQ服务端用户权限设置

      测试之前, 先要确保RocketMQ服务器开启了ACL验证功能。

      • 开启ACL验证

        修改%RABBITMQ_HOME%/conf/broker.conf文件, 末尾增加:

        #开启ACL权限控制功能
        aclEnable=true
        
        • 1
        • 2
      • 分配用户权限

        修改%RABBITMQ_HOME%/conf/plain_acl.yml文件:

        globalWhiteRemoteAddresses:
        
        accounts:
        - accessKey: RocketMQ
          secretKey: 12345678
          whiteRemoteAddress:
          admin: false
          defaultTopicPerm: DENY
          defaultGroupPerm: SUB
          topicPerms:
          - topicA=DENY
          - topicB=PUB|SUB
          - topicC=SUB
          groupPerms:
          # the group should convert to retry topic
          - groupA=DENY
          - groupB=PUB|SUB
          - groupC=SUB
         
        - accessKey: rocketmq2
          secretKey: 12345678
          whiteRemoteAddress: 192.168.1.*
          # if it is admin, it could access all resources
          admin: true
          defaultTopicPerm: PUB|SUB
          defaultGroupPerm: SUB
          topicPerms:
          - topicA=DENY
          - topicB=PUB|SUB
          - topic_acl_spring_message=PUB|SUB
          - topic_acl_transaction_spring_message=PUB|SUB
          - topicC=SUB
          groupPerms:
          # the group should convert to retry topic
          - groupA=DENY
          - groupB=PUB|SUB
          - groupC=SUB
          - group_acl_spring_message=PUB|SUB
          - group_acl_transaction_spring_message=PUB|SUB
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
        • 7
        • 8
        • 9
        • 10
        • 11
        • 12
        • 13
        • 14
        • 15
        • 16
        • 17
        • 18
        • 19
        • 20
        • 21
        • 22
        • 23
        • 24
        • 25
        • 26
        • 27
        • 28
        • 29
        • 30
        • 31
        • 32
        • 33
        • 34
        • 35
        • 36
        • 37
        • 38
        • 39
        • 默认该配置文件下会有两个用户, RocketMQ与rocketmq2, 这里要修改rocketmq2的权限。

        • rocketmq2用户虽然具有admin权限, 但是Rocketmq的ACL处理源码仍要读取topicPerms属性配置,否则会报错, 这里追加我们用于ACL测试的相关TOPIC与GROUP, 确保rocketmq2用户拥有测试时的所有权限, RocketMQ用户则无权限:

          defaultTopicPerm: PUB|SUB
            defaultGroupPerm: SUB
            topicPerms:
            - topicA=DENY
            - topicB=PUB|SUB
            - topic_acl_spring_message=PUB|SUB
            - topic_acl_transaction_spring_message=PUB|SUB
            - topicC=SUB
            groupPerms:
            # the group should convert to retry topic
            - groupA=DENY
            - groupB=PUB|SUB
            - groupC=SUB
            - group_acl_spring_message=PUB|SUB
            - group_acl_transaction_spring_message=PUB|SUB
          
          • 1
          • 2
          • 3
          • 4
          • 5
          • 6
          • 7
          • 8
          • 9
          • 10
          • 11
          • 12
          • 13
          • 14
          • 15
        • 权限控制参数说明

          字段取值含义
          globalWhiteRemoteAddresses;192.168..*;192.168.0.1全局IP白名单
          accessKey字符串Access Key 用户名
          secretKey字符串Secret Key 密码
          whiteRemoteAddress;192.168..*;192.168.0.1用户IP白名单
          admintrue;false是否管理员账户
          defaultTopicPermDENY;PUB;SUB;PUB|SUB默认的Topic权限
          defaultGroupPermDENY;PUB;SUB;PUB|SUB默认的ConsumerGroup权限
          topicPermstopic=权限各个Topic的权限
          groupPermsgroup=权限各个ConsumerGroup的权限
    7. 测试验证

      • 权限分配好后, 重启RabbitMQ服务, 要确保读取的是我们修改的配置文件。

        1. 启动NameServer

          nohup bin/mqnamesrv >/dev/null 2>&1 &

        2. 启动Broker

          sh bin/mqbroker -n 127.0.0.1:9876 -c /usr/local/rocketmq4.4/conf/broker.conf &

        3. 关闭Broker

          bin/mqshutdown broker

        4. 关闭name server

          bin/mqshutdown namesrv

      • 使用RocketMQ用户,发送数据, 预期应该是无权限。

        修改配置文件, 启动服务:

        # RocketMQ配置
        rocketmq:
          name-server: 10.10.20.15:9876
          # 生产者配置
          producer:
            group: basic-group
            # 权限信息
            access-key: RocketMQ
            secret-key: 12345678
          # 消费者配置
          consumer:
            # 权限信息
            accessKey: RocketMQ
            secret-key: 12345678
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
        • 7
        • 8
        • 9
        • 10
        • 11
        • 12
        • 13
        • 14
      • 访问发送接口

        http://127.0.0.1:12615/sendSpringMessage

        在这里插入图片描述

        出现异常, 查看控制台日志:

        在这里插入图片描述

        没有该主题topic_acl_spring_message的操作权限, ACL正常生效。

      • 使用rocketmq2用户,发送数据,预期是可以正常发送与接收

        修改配置文件:

        # RocketMQ配置
        rocketmq:
          name-server: 10.10.20.15:9876
          # 生产者配置
          producer:
            group: basic-group
            # 权限信息
            access-key: rocketmq2
            secret-key: 12345678
          # 消费者配置
          consumer:
            # 权限信息
            accessKey: rocketmq2
            secret-key: 12345678
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
        • 7
        • 8
        • 9
        • 10
        • 11
        • 12
        • 13
        • 14
      • 调用发送接口
        在这里插入图片描述

        查看控制台监听日志:
        在这里插入图片描述

        通过ACL权限控制, 能够正常发送与接收队列数据。

    3. RocketMQ集成之Transaction事务消息

    假设场景,用户进行转账, 先扣除自身的账户金额, 再发送消息通知, 增加对方的账户金额, 在发送消息通知的过程中如果失败该如何处理? 为了解决本地事务执行与消息发送的原子性问题, RocketMQ推出了Transaction事务消息(并非分布式事务解决方案, 但可以基于此功能,与补偿机制实现一套方案)。 具体处理机制:

    在这里插入图片描述

    1. 仍采用ACL机制, 基于rocketmq-acl工程改造实现

    2. 增加接收监听器

      com.mirson.spring.boot.mq.rocket.acl.consume.StringTransactionConsumer

      @Service
      @RocketMQMessageListener(
              topic = RabbitMqConfig.TOPIC_SPRING_TRANSACTION_MESSAGE,
              consumerGroup = RabbitMqConfig.CONSUME_GROUP_TRANSACTION_SPRING_MESSAGE
      )
      @Log4j2
      public class StringTransactionConsumer implements RocketMQListener<String> {
          @Override
          public void onMessage(String message) {
              log.info("StringTransactionConsumer => receive transaction str: " + message);
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12

      这里订阅的GROUP为RabbitMqConfig.CONSUME_GROUP_TRANSACTION_SPRING_MESSAGE。

    3. 自定义事务监听器

      com.mirson.spring.boot.mq.rocket.acl.config.TransactionListener

      @RocketMQTransactionListener(
              txProducerGroup = RabbitMqConfig.CONSUME_GROUP_TRANSACTION_SPRING_MESSAGE
      )
      @Log4j2
      public class TransactionListener implements RocketMQLocalTransactionListener {
          private AtomicInteger transactionIndex = new AtomicInteger(0);
      
          private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<String, Integer>();
      
          @Override
          public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
              String transId = (String)msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
              int value = transactionIndex.getAndIncrement();
              int status = value % 3;
              localTrans.put(transId, status);
              if (status == 0) {
                   return RocketMQLocalTransactionState.COMMIT;
              }
              if (status == 1) {
                  log.info("    # ROLLBACK # Simulating %s related local transaction exec failed! {}", new String((byte[])msg.getPayload()));
                  return RocketMQLocalTransactionState.ROLLBACK;
              }
      
              return RocketMQLocalTransactionState.UNKNOWN;
          }
      
          @Override
          public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
              String transId = (String)msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
              RocketMQLocalTransactionState retState = RocketMQLocalTransactionState.COMMIT;
              Integer status = localTrans.get(transId);
              if (null != status) {
                  switch (status) {
                      case 0:
                          retState = RocketMQLocalTransactionState.UNKNOWN;
                          break;
                      case 1:
                          retState = RocketMQLocalTransactionState.COMMIT;
                          break;
                      case 2:
                          retState = RocketMQLocalTransactionState.COMMIT;
                          break;
                  }
              }
              return retState;
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 通过注解RocketMQTransactionListener实现自定义事务监听器, txProducerGroup要与上面监听器配置的Group一致,如果接收监听器不指定Group, 将采用RocketMQ默认的事务控制处理器。
      • 这里监听器的主要作用, 控制消息的提交与回滚, 通过取模计算, 将结果为1的数据进行回滚并打印。
    4. 定义发送接口

          /**
           * 发送RocketMQ Transaction 事务消息
           * @return
           */
          @GetMapping("/sendTransactionMessage")
          public String sendTransactionMessage() {
      
              for (int i = 0; i < 10; i++)
              {
                  Message msg = MessageBuilder.withPayload("seq number " + i).
                      setHeader(RocketMQHeaders.TRANSACTION_ID, "KEY_" + i).build();
      
                  SendResult sendResult = rocketMQTemplate.sendMessageInTransaction(RabbitMqConfig.CONSUME_GROUP_TRANSACTION_SPRING_MESSAGE,
                      RabbitMqConfig.TOPIC_SPRING_TRANSACTION_MESSAGE, msg, null);
                  log.info("seq " + i  + " send result: " + sendResult.getSendStatus());
               }
      
              return "send transaction message success.";
          }
      
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20

      定义sendTransactionMessage接口发送事务消息, 这里连续发送十条事务消息,调用rocketMQTemplate 的sendMessageInTransaction方法, 指定配置的组别与主题信息。

    5. 测试验证

      发送十条事务消息, 在事务监听器里面, 有部分数据会出现回滚, 下面验证, 监听器是否正常接收确认的消息, 能否接收到回滚的消息。

      • 调用接口
        在这里插入图片描述

      • 查看接收日志结果

        在这里插入图片描述

        可以看到, 成功发送了十条数据, 有4条数据出现回滚, 监听器打印接收了6条数据, 验证成功。

    4. 总结

    这里全面的讲解RocketMQ技术点, 相对较多, 也可以看出RocketMQ功能比较丰富, 有较好的扩展性,灵活性,适用各种业务场景, 不仅可以与Spring Boot 集成, 还可以支持Spring Cloud Stream 在微服务中应用。RocketMQ还支持消息轨迹跟踪, 异步顺序发送, 并发消费等, 更多功能大家可以再深入研究, 能够更好的适应生产项目对不同场景的使用要求。

  • 相关阅读:
    VR全景对行业发展有什么帮助?VR全景制作需要注意什么?
    Redis 源码简洁剖析 13 - RDB 文件
    电脑重装系统 win11 怎么关闭系统软件通知
    新型双功能螯合剂NOTA及其衍生物CAS号:147597-66-8p-SCN-Bn-NOTA
    学习心得——数据预处理(探索性数据分析)
    UVM 覆盖率
    在vue中点击右键出现自定义操作菜单
    Servlet之动态绑定应用(成绩录入)
    38.迪杰斯特拉(Dijkstra)算法
    how to alert when etl inbound file delay in GCP storage
  • 原文地址:https://blog.csdn.net/hxx688/article/details/126083777