• Rocketmq讲解以及使用Spring Cloud Stream操作


    安装:

    RocketMQ 4.5.1安装教程_慕课手记

     搭建RocketMQ控制台

     RocketMQ控制台安装教程_慕课手记

     Spring Cloud Stream是什么?

    • 是一个用于构建消息驱动的微服务的框架
    • 可实现kafka,rabbitmq,rocketmq的无感知切换
    • 当消息生产者使用Kafka发送消息,那只能用Kafka来接收消息。当使用SpringCloudStream来处理消息的话,我们接收Kafka的消息,可以使用其他的消息中间件来进行接收。SpringCloudStream对消息进行了一层封装,所以我们不需要去关心生产者用的是什么消息中间件。
       

     Spring Cloud Stream编写rocketmq生产者?

    1.   添加依赖:
      1. org.springframework.cloud
      2. spring-cloud-starter-stream-rocketmq
    2. 添加注解添加@ EnableBing(Source.class) 注解,如图所示:

      

     3.写配置(application.yml):

    1. spring:
    2. cloud:
    3. stream:
    4. rocketmq:
    5. binder:
    6. name-server: 127.0.0.1:9876
    7. bindings:
    8. output:
    9. # 用来指定topic
    10. destination: stream-test-topic

     4.生产者发送消息:

    1. @GetMapping("test-stream")
    2. public String testStream(){
    3. this.source.output()
    4. .send(
    5. MessageBuilder
    6. .withPayload("消息体")
    7. .build()
    8. );
    9. return "success";
    10. }

    Spring Cloud Stream 消息消费者?

    1. 添加依赖:
    1. org.springframework.cloud
    2. spring-cloud-starter-stream-rocketmq

     2.添加注解添加@ EnableBing(Sink.class) 注解,如图所示:

     

     3.写配置:

    1. spring:
    2. cloud:
    3. stream:
    4. rocketmq:
    5. binder:
    6. name-server: 127.0.0.1:9876
    7. bindings:
    8. input:
    9. destination: stream-test-topic
    10. group: binder-group # 这里的group 一定要设置; 如果使用的不是rocketmq的话,这里可以不用设置,可以留空

    4.编写监听消费类

    1. @Service
    2. @Slf4j
    3. public class TestStreamConsumer{
    4. @StreamListener(Sink.INPUT)
    5. public void receive(String messageBody){
    6. log.info("通过stream收到了消息: messageBody = {}");
    7. }
    8. }

     到此已经完成了rocketmq的基本操作,我们使用的监听配置类都是java默认自带的,我们也可以自定义,举个生产者的例子:

    1.定义类

    1. public interface MySource{
    2. String MY_OUTPUT= "my-output";
    3. @Output(MY_OUTPUT)
    4. MessageChannel output();
    5. }

    2.启动类添加注解

     3.加配置

     其他都是雷同的,就不一一列举了

     下边讲一下mq的分布式事务

    什么场景使用?

    当我们的逻辑代码中,不仅仅对数据库做了处理,一些场景下我们需要同时进行消息发送和与MySQL进行交互的功能;此图中,我们首先进行了消息发送,然后再把消息写入缓存,那么就会导致: 如果写入缓存的时候,代码执行失败,回滚操作只能回滚数据库,消息已经被消费者监听到了并做了处理了。

    rocketmq如何做到分布式事务?

    简单来说RocketMQ实现分布式事务的原理是: 执行到应该发送消息的时候,它并未发送,而是处于“准备发送”阶段,当所有的代码都已执行完毕且无异常时,则进行完全发送,此刻消息消费者才能监听到消息;

    概念术语讲解:

    • 半消息(Half(Prepare) Message)
      • 暂时无法消费的消息。生产者将消息发送到了MQ server,但这个消息会被标记为“暂不能投递”状态,先存储起来;消费者不会去消费这条消息
    • 消息回查(Message Status Check)
      • 网络断开或生产者重启可能会导致丢失事务消息的第二次确认。当MQ Server发现消息长时间处于半消息状态时,将向消息生产者发送请求,询问该消息的最终状态(提交或回滚)
    • 消息三态:
      • Commit:提交事务消息,消费者可以消费此消息
      • Rollback: 回滚事务消息,broker会删除该消息,消费者不能消费
      • UNKNOWN: broker需要回查确认该消息的状态

    我们本文主要讲的事spring cloud stream的分布式事务,但是SpringCloud Stream 本身没有实现分布式事务,它与RocketMQ结合则是使用RocketMQ的分布式事务。它若与其他结合,则使用其他消息中间件的分布式事务。

     如何做到分布式事务?

    1. 到数据库中新增一张表,用来记录 RocketMQ的事务日志:

        执行代码:

        

    1. create table rocketmq_transaction_log(
    2. id int auto_increment comment 'id' primary key,
    3. transaction_Id varchar(45) not null comment '事务id',
    4. log varchar(45) not null comment '日志')

     2.消息生产者编写:发送半消息:

    1. //列举两种方式,一种是原始的rocketmqTempalte发送消息,一种是spring cloud stream发送消息
    2. 一:String transactionId=UUID.randomUUID().toString()
    3. this.rocketMQTemplate.sendMessageInTransaction(
    4. "tx-add-bonus-group","add-bonus",MessageBuilder.withPayload(
    5. UserAddBonusMsgDTO.builder().userId(share.getUserId)
    6. .bonus(50)
    7. .build()
    8. ).setHeader(RocketMQHeaders.TRANSACTION_ID,transactionId)
    9. .setHeader("share_id",id)
    10. .build(),
    11. auditDTO
    12. )
    13. 二:

     

    3.修改配置文件(注:之前编写生产者配置文件,是不需要添加分组的,但是现在我们不使用spring cloud stream的方式,而使用原始的事务方式监听,所以需要在生产者的rocketmq配置下编写事务配置和分组,然后消费者直接监听即可 )如图

     4.消息的监听定义组名称时,一定要与生产者配置文件中的保持一致,如图所示

     5.消费者编写如下

    1. @RocketMQTransactionListener(txProducerGroup = "tx-add-bonus-group")
    2. @RequiredArgsConstructor(onConstructor = @_(@Autowired))
    3. public class AddBonusTransactionListener implements RocketMQLocalTransactionListener{
    4. @Override
    5. public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg)
    6. String transactionId(String)headers.get(RocketMQHeaders.TRANSACTION_ID);
    7. Integer shareId= Integer.valueOf((String)headers.get("share_id"))
    8. try{
    9. this.shareService.auditByIdInDB(shareId,(ShareAuditDTO) arg)
    10. return RocketMQLocalTransactionState.COMMIT;
    11. }catch(Exception e){
    12. return RocketMQLocalTransactionState.ROLLBACK;
    13. }
    14. }
    15. // 编写回查代码,当我们
    16. @Override
    17. public RocketMQLocalTransactionState checkLocalTransaction(Message msg){
    18. return null;
    19. }
    20. }

    当我们执行成功,则执行RocketMQLocalTransactionState.COMMIT,失败则ROLLBACK。但是有这样一种情况,比如我们已经执行完逻辑代码,正准备COMMIT提交,此时突然停电了,导致数据已经存入,但是却没有提交成功。所以我们需要一个回查方法,checkLocalTransaction()是一个回查方法,它会去里面进行判断是否执行成功。结合我们已经建立的RocketMQ事务表,我们可以进行回查操作,代码看下方:
     

    1. //新建一个存入方法,我们之前的存入方法,没有将事务数据加入日志表,我们可以这样改造: 当数据存入的时候,将数据存入日志表;回查方法就进行回查,如果没有存入则表示执行失败:
    2. @Autowired
    3. private RocketmqTransactionLogMapepr rocketmqTransactionLogMapepr;
    4. @Transactional(rollbackFor= Exception.class)
    5. public void auditByIdWithRocketMqLog(Integer id, ShareAuditDTO auditDTO, String transactionId){
    6. //异步业务代码
    7. this.auditByIdInDB(id,auditDTO);
    8. //新增rocketmq事务id,表示已提交,可以commit
    9. this.rocketmqTransactionLogMapper.insertSelective(
    10. RocketmqTransactionLog.builder().transactionId(transactionId)
    11. .log("审核分享")
    12. .build()
    13. );
    14. }

    消息消费者重写:

    1. @Autowired
    2. private ShareService shareService;
    3. @Autowired
    4. priavte RocketmqTransactionLogMapper rocketmqTransactionLogMapper;
    5. @RocketMQTransactionListener(txProducerGroup = "tx-add-bonus-group")
    6. @RequiredArgsConstructor(onConstructor = @_(@Autowired))
    7. public class AddBonusTransactionListener implements RocketMQLocalTransactionListener{
    8. @Override
    9. public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg)
    10. String transactionId(String)headers.get(RocketMQHeaders.TRANSACTION_ID);
    11. Integer shareId= Integer.valueOf((String)headers.get("share_id"))
    12. try{
    13. //上边编写的方法
    14. this.shareService.auditByIdWIthRocketMqLog(shareId,(ShareAuditDTO) arg,transactionId)
    15. return RocketMQLocalTransactionState.COMMIT;
    16. }catch(Exception e){
    17. return RocketMQLocalTransactionState.ROLLBACK;
    18. }
    19. }
    20. // 编写回查代码,当消息长时间未被消费,就会回调这个函数
    21. @Override
    22. public RocketMQLocalTransactionState checkLocalTransaction(Message msg){
    23. MessageHeaders headers= msg.getHeaders();
    24. String transactionId= (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
    25. // 查询是否存了事务数据
    26. this.rocketmqTransactionLogMapper.selectOne(RocketmqTransactionLog.builder().transactionId(transactionId).build());
    27. // 判断是否提交
    28. if(transactionLog != null){
    29. return RocketMQLocalTransactionState.COMMIT;
    30. }
    31. return RocketMQLocalTransactionState.ROLLBACK;
    32. }
    33. }

  • 相关阅读:
    AD域控服务器部署
    软件开发详解:同城O2O与外卖跑腿系统源码的架构与开发要点
    Unity-UV展开工具
    跟我学C++中级篇——右值引用和万能引用
    【EI会议征稿】第三届机械、建模与材料工程国际学术会议(I3ME 2023)
    YoloV8改进策略:SwiftFormer,全网首发,独家改进的高效加性注意力用于实时移动视觉应用的模型,重构YoloV8
    C++多线程编程(第三章 案例3:把案例1改装成案例2的条件变量多线程方式)
    内核驱动踩坑记录
    21.Spring Cloud Gateway 简介
    C++ 中 cin 和 getline 的使用
  • 原文地址:https://blog.csdn.net/weixin_59244784/article/details/127122631