-
org.springframework.cloud -
spring-cloud-starter-stream-rocketmq -
添加注解添加@ EnableBing(Source.class) 注解,如图所示:
3.写配置(application.yml):
- spring:
- cloud:
- stream:
- rocketmq:
- binder:
- name-server: 127.0.0.1:9876
- bindings:
- output:
- # 用来指定topic
- destination: stream-test-topic
4.生产者发送消息:
- @GetMapping("test-stream")
- public String testStream(){
- this.source.output()
- .send(
- MessageBuilder
- .withPayload("消息体")
- .build()
- );
- return "success";
- }
-
org.springframework.cloud -
spring-cloud-starter-stream-rocketmq -
2.添加注解添加@ EnableBing(Sink.class) 注解,如图所示:

3.写配置:
- spring:
- cloud:
- stream:
- rocketmq:
- binder:
- name-server: 127.0.0.1:9876
- bindings:
- input:
- destination: stream-test-topic
- group: binder-group # 这里的group 一定要设置; 如果使用的不是rocketmq的话,这里可以不用设置,可以留空
-
4.编写监听消费类
- @Service
- @Slf4j
- public class TestStreamConsumer{
- @StreamListener(Sink.INPUT)
- public void receive(String messageBody){
- log.info("通过stream收到了消息: messageBody = {}");
-
- }
- }
到此已经完成了rocketmq的基本操作,我们使用的监听配置类都是java默认自带的,我们也可以自定义,举个生产者的例子:
1.定义类
- public interface MySource{
- String MY_OUTPUT= "my-output";
-
- @Output(MY_OUTPUT)
- MessageChannel output();
- }
2.启动类添加注解

3.加配置

其他都是雷同的,就不一一列举了
当我们的逻辑代码中,不仅仅对数据库做了处理,一些场景下我们需要同时进行消息发送和与MySQL进行交互的功能;此图中,我们首先进行了消息发送,然后再把消息写入缓存,那么就会导致: 如果写入缓存的时候,代码执行失败,回滚操作只能回滚数据库,消息已经被消费者监听到了并做了处理了。
简单来说RocketMQ实现分布式事务的原理是: 执行到应该发送消息的时候,它并未发送,而是处于“准备发送”阶段,当所有的代码都已执行完毕且无异常时,则进行完全发送,此刻消息消费者才能监听到消息;
我们本文主要讲的事spring cloud stream的分布式事务,但是SpringCloud Stream 本身没有实现分布式事务,它与RocketMQ结合则是使用RocketMQ的分布式事务。它若与其他结合,则使用其他消息中间件的分布式事务。

执行代码:
- create table rocketmq_transaction_log(
- id int auto_increment comment 'id' primary key,
- transaction_Id varchar(45) not null comment '事务id',
- log varchar(45) not null comment '日志')
2.消息生产者编写:发送半消息:
- //列举两种方式,一种是原始的rocketmqTempalte发送消息,一种是spring cloud stream发送消息
-
-
- 一:String transactionId=UUID.randomUUID().toString()
- this.rocketMQTemplate.sendMessageInTransaction(
- "tx-add-bonus-group","add-bonus",MessageBuilder.withPayload(
- UserAddBonusMsgDTO.builder().userId(share.getUserId)
- .bonus(50)
- .build()
- ).setHeader(RocketMQHeaders.TRANSACTION_ID,transactionId)
- .setHeader("share_id",id)
- .build(),
- auditDTO
- )
-
-
- 二:

3.修改配置文件(注:之前编写生产者配置文件,是不需要添加分组的,但是现在我们不使用spring cloud stream的方式,而使用原始的事务方式监听,所以需要在生产者的rocketmq配置下编写事务配置和分组,然后消费者直接监听即可 )如图

4.消息的监听定义组名称时,一定要与生产者配置文件中的保持一致,如图所示

5.消费者编写如下
- @RocketMQTransactionListener(txProducerGroup = "tx-add-bonus-group")
- @RequiredArgsConstructor(onConstructor = @_(@Autowired))
- public class AddBonusTransactionListener implements RocketMQLocalTransactionListener{
-
- @Override
- public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg)
- String transactionId(String)headers.get(RocketMQHeaders.TRANSACTION_ID);
- Integer shareId= Integer.valueOf((String)headers.get("share_id"))
- try{
- this.shareService.auditByIdInDB(shareId,(ShareAuditDTO) arg)
- return RocketMQLocalTransactionState.COMMIT;
- }catch(Exception e){
- return RocketMQLocalTransactionState.ROLLBACK;
- }
- }
- // 编写回查代码,当我们
- @Override
- public RocketMQLocalTransactionState checkLocalTransaction(Message msg){
- return null;
- }
- }
当我们执行成功,则执行RocketMQLocalTransactionState.COMMIT,失败则ROLLBACK。但是有这样一种情况,比如我们已经执行完逻辑代码,正准备COMMIT提交,此时突然停电了,导致数据已经存入,但是却没有提交成功。所以我们需要一个回查方法,checkLocalTransaction()是一个回查方法,它会去里面进行判断是否执行成功。结合我们已经建立的RocketMQ事务表,我们可以进行回查操作,代码看下方:
- //新建一个存入方法,我们之前的存入方法,没有将事务数据加入日志表,我们可以这样改造: 当数据存入的时候,将数据存入日志表;回查方法就进行回查,如果没有存入则表示执行失败:
- @Autowired
- private RocketmqTransactionLogMapepr rocketmqTransactionLogMapepr;
-
- @Transactional(rollbackFor= Exception.class)
- public void auditByIdWithRocketMqLog(Integer id, ShareAuditDTO auditDTO, String transactionId){
- //异步业务代码
- this.auditByIdInDB(id,auditDTO);
- //新增rocketmq事务id,表示已提交,可以commit
- this.rocketmqTransactionLogMapper.insertSelective(
- RocketmqTransactionLog.builder().transactionId(transactionId)
- .log("审核分享")
- .build()
- );
- }
消息消费者重写:
- @Autowired
- private ShareService shareService;
- @Autowired
- priavte RocketmqTransactionLogMapper rocketmqTransactionLogMapper;
- @RocketMQTransactionListener(txProducerGroup = "tx-add-bonus-group")
- @RequiredArgsConstructor(onConstructor = @_(@Autowired))
- public class AddBonusTransactionListener implements RocketMQLocalTransactionListener{
-
- @Override
- public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg)
- String transactionId(String)headers.get(RocketMQHeaders.TRANSACTION_ID);
- Integer shareId= Integer.valueOf((String)headers.get("share_id"))
- try{
- //上边编写的方法
- this.shareService.auditByIdWIthRocketMqLog(shareId,(ShareAuditDTO) arg,transactionId)
-
- return RocketMQLocalTransactionState.COMMIT;
- }catch(Exception e){
- return RocketMQLocalTransactionState.ROLLBACK;
- }
- }
- // 编写回查代码,当消息长时间未被消费,就会回调这个函数
- @Override
- public RocketMQLocalTransactionState checkLocalTransaction(Message msg){
- MessageHeaders headers= msg.getHeaders();
- String transactionId= (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
- // 查询是否存了事务数据
- this.rocketmqTransactionLogMapper.selectOne(RocketmqTransactionLog.builder().transactionId(transactionId).build());
- // 判断是否提交
- if(transactionLog != null){
- return RocketMQLocalTransactionState.COMMIT;
- }
- return RocketMQLocalTransactionState.ROLLBACK;
- }
- }