本文主要记录Springboot集成RocketMQ来实现延时消息和事务消息
基于springboot2.6.8,客户端RocketMQ4.9.3,rocket依赖如下
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>
yaml配置
rocketmq:
name-server: 192.168.0.221:9876
producer:
group: producer_group
#默认3000 过短会报timeout错误
send-message-timeout: 10000
适用场景:对某个数据在多久后做过期或者做通知之类的。
为什么要选择RocketMQ?
@Autowired
RocketMQTemplate rocketMQTemplate;
//SendResult syncSend(String destination, Message<?> message, long timeout, int delayLevel)
//delayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 对应定时的延时时间
SendResult sendResult = rocketMQTemplate.syncSend("test_topic", MessageBuilder.withPayload(
"延迟10秒" + new Date().getTime()).build(), 3000, 3);
log.info("消息延时推送成功,{},{}", sendResult.getSendStatus(), new Date().getTime());
@Slf4j
@Component
@RocketMQMessageListener(topic = "test_topic", consumerGroup = "consume_group")
public class RocketmqConsume implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
log.info("收到消息:{},{}", s, new Date().getTime());
}
}
c.e.r.controller.RocketmqController : 消息延时推送成功,SEND_OK,1656054586384
c.e.rocketmq.consume.RocketmqConsume : 收到消息:延迟10秒1656054586381,1656054595694
可以看到时间误差是在一秒内的,在某些场景非常的实用。
实用场景:常说的二者不可兼得,有这么一个场景:
应用需要在一条消息传递过程中记录一些什么内容成功后再传给最终的消费者
那么RocketMQ中使用事务消息操作流程应该是这样的:
@Autowired
RocketMQTemplate rocketMQTemplate;
for (int i = 0; i < 10; i++) {
//模拟十个订单入库 计算返利
rocketMQTemplate.sendMessageInTransaction("transaction_topic",
MessageBuilder.withPayload(UUID.randomUUID().toString()).build(), i);
Thread.sleep(1000);
}
@Slf4j
@Component
//在目前版本中仅能注解一个是我执行者
@RocketMQTransactionListener
public class TransactionExcetor implements RocketMQLocalTransactionListener {
Map<String, Boolean> resMap = new ConcurrentHashMap<>();
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
String orderId = new String((byte[]) message.getPayload());
//每三个订单成功一个 即订单成功入库,发送消息到账务进行返利转账
boolean res = Integer.valueOf(o.toString()) % 3 == 0;
log.info("开始执行本地事务:{},{}", orderId, o.toString(), res);
//存储回查状态
resMap.put(orderId, res);
//commit成功 UNKNOWN未知状态 ROLLBACK在这一步就直接回滚了
return res ? RocketMQLocalTransactionState.COMMIT : RocketMQLocalTransactionState.UNKNOWN;
}
//UNKNOWN、服务重启、无响应情况下提供事务回查 最终状态自己控制
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
String orderId = new String((byte[]) message.getPayload());
boolean res = resMap.get(orderId);
log.info("执行事务检查:{},{}", orderId, res);
return res ? RocketMQLocalTransactionState.COMMIT : RocketMQLocalTransactionState.ROLLBACK;
}
}
@Slf4j
@Component
@RocketMQMessageListener(topic = "transaction_topic", consumerGroup = "consume_group1")
public class RocketmqConsume1 implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
log.info("收到返利消息:{},{}", s, new Date().getTime());
}
}
c.e.rocketmq.consume.TransactionExcetor : 开始执行本地事务:c82fae4d-6203-446e-976d-10de860fff9b,1
c.e.rocketmq.consume.TransactionExcetor : 开始执行本地事务:4cf60f44-f088-48c0-bdb7-c73ee7af53f9,2
c.e.rocketmq.consume.TransactionExcetor : 开始执行本地事务:8b49aefd-1057-44cc-a832-3e6882d31962,3
c.e.rocketmq.consume.RocketmqConsume1 : 收到返利消息:8b49aefd-1057-44cc-a832-3e6882d31962,1656054589418
c.e.rocketmq.consume.TransactionExcetor : 开始执行本地事务:04fe4c7b-5658-4499-b2ec-087a74db6177,4
c.e.rocketmq.consume.TransactionExcetor : 开始执行本地事务:19ad92dd-8f20-416a-95ae-5ed5247a8df5,5
c.e.rocketmq.consume.TransactionExcetor : 开始执行本地事务:fa77df40-d2ae-4cd5-a947-2e61eb61f62a,6
c.e.rocketmq.consume.RocketmqConsume1 : 收到返利消息:fa77df40-d2ae-4cd5-a947-2e61eb61f62a,1656054592444
c.e.rocketmq.consume.TransactionExcetor : 开始执行本地事务:8724bd01-4cc9-4ae4-b19a-27caeeacfaff,7
c.e.rocketmq.consume.TransactionExcetor : 开始执行本地事务:b6310015-ae72-4e1b-ba0f-1d8273aa81c1,8
c.e.rocketmq.consume.TransactionExcetor : 开始执行本地事务:a81e290a-3611-4a28-b41c-e4a36cf99a4a,9
c.e.rocketmq.consume.RocketmqConsume1 : 收到返利消息:a81e290a-3611-4a28-b41c-e4a36cf99a4a,1656054595464
c.e.rocketmq.consume.TransactionExcetor : 执行事务检查:4cf60f44-f088-48c0-bdb7-c73ee7af53f9,false
c.e.rocketmq.consume.TransactionExcetor : 执行事务检查:c82fae4d-6203-446e-976d-10de860fff9b,false
c.e.rocketmq.consume.TransactionExcetor : 执行事务检查:04fe4c7b-5658-4499-b2ec-087a74db6177,false
c.e.rocketmq.consume.TransactionExcetor : 执行事务检查:19ad92dd-8f20-416a-95ae-5ed5247a8df5,false
c.e.rocketmq.consume.TransactionExcetor : 执行事务检查:8724bd01-4cc9-4ae4-b19a-27caeeacfaff,false
c.e.rocketmq.consume.TransactionExcetor : 执行事务检查:b6310015-ae72-4e1b-ba0f-1d8273aa81c1,false
特别注意:多个监听者情况下,consumeGroup一定不要是同一个,否则消息会丢失
其他扩展:
//多个生产者 可以手写连接代码
new DefaultMQProducer
//多个消费者 同上
new DefaultMQPushConsumer
以上就是本章的全部内容了。
上一篇:RocketMQ第一话 – Docker安装以及Springboot集成RocketMQ
下一篇:RocketMQ第三话 – RocketMQ高可用集群搭建
车到山前必有路,柳暗花明又一村