Springboot 集成 RocketMq(入门)-CSDN博客
本文主要使用RocketMQTemplate完成生成消息。
- @Resource
- private RocketMQTemplate rocketMqTemplate;
-
- @GetMapping("/send/async/{messageBody}")
- public String sendAsyncMsg(@PathVariable("messageBody") String messageBody) {
- rocketMqTemplate.asyncSend("async", messageBody, new SendCallback() {
- @Override
- public void onSuccess(SendResult sendResult) {
- log.info("sendAsyncMsg success messageBody:{}", messageBody);
- }
-
- @Override
- public void onException(Throwable throwable) {
- log.error("sendAsyncMsg fail messageBody:{}", messageBody);
- }
- });
- log.info("sendAsyncMsg operate oK");
- return "OK";
- }
- @Component
- @RocketMQMessageListener(topic = "async", consumerGroup = "async")
- @Slf4j
- public class MyAsyncConsumer implements RocketMQListener
{ -
- @Override
- public void onMessage(String message) {
- // 处理消息的逻辑
- log.info("Received async message: {}", message);
- }
-
- }

通过日志可以发现,主线程结束线程日志先于异步日志打印完成。
- @Resource
- private RocketMQTemplate rocketMqTemplate;
-
- @GetMapping("/send/broadcast/{messageBody}")
- public String sendBroadcastMsg(@PathVariable("messageBody") String messageBody) {
- // 单向不可靠消息 void 方法无返回值
- SendResult sendResult = rocketMqTemplate.syncSend("broadcast", messageBody);
- log.info("sendBroadcastMsg msgId:{},sendStatus:{}", sendResult.getMsgId(), sendResult.getSendStatus());
- return "OK";
- }
消费者监听注解上设置 messageModel = MessageModel.BROADCASTING,默认是(MessageModel.CLUSTERING)。
- @Component
- @RocketMQMessageListener(topic = "broadcast", consumerGroup = "broadcast1", messageModel = MessageModel.BROADCASTING )
- @Slf4j
- public class MyBroadcast1Consumer implements RocketMQListener
{ -
- @Override
- public void onMessage(String message) {
- // 处理消息的逻辑
- log.info("MyBroadcast1Consumer Received broadcast message: {}", message);
- }
-
- }
-
- @Component
- @RocketMQMessageListener(topic = "broadcast", consumerGroup = "broadcast2", messageModel = MessageModel.BROADCASTING )
- @Slf4j
- public class MyBroadcast2Consumer implements RocketMQListener
{ -
- @Override
- public void onMessage(String message) {
- // 处理消息的逻辑
- log.info("MyBroadcast2Consumer Received broadcast message: {}", message);
- }
-
- }

- @Resource
- private RocketMQTemplate rocketMqTemplate;
-
- @GetMapping("/send/delayed/{messageBody}")
- public String sendDelayedMsg(@PathVariable("messageBody") String messageBody) {
- // 延时10秒发送
- SendResult sendResult = rocketMqTemplate.syncSendDelayTimeSeconds("delayed", messageBody, 10L);
- log.info("sendDelayedMsg msgId:{},sendStatus:{}", sendResult.getMsgId(), sendResult.getSendStatus());
- return "OK";
- }
- @Component
- @RocketMQMessageListener(topic = "delayed", consumerGroup = "delayed")
- @Slf4j
- public class MyDelayedConsumer implements RocketMQListener
{ -
- @Override
- public void onMessage(Message message) {
- // 处理消息的逻辑
- log.info("Received delayed message: {}", message);
- }
-
- }

message.setDeliverTimeMs(System.currentTimeMillis() + 10);
- @Resource
- private RocketMQTemplate rocketMqTemplate;
-
- @GetMapping("/send/scheduled/{messageBody}")
- public String sendScheduledMsg(@PathVariable("messageBody") String messageBody) {
- // 指定时间发送 (当前时间 + 30秒)
- SendResult sendResult = rocketMqTemplate.syncSendDeliverTimeMills("scheduled", messageBody, (System.currentTimeMillis() + (30L * 1000L)));
- log.info("sendDelayedMsg msgId:{},sendStatus:{}", sendResult.getMsgId(), sendResult.getSendStatus());
- return "OK";
- }
- @Component
- @RocketMQMessageListener(topic = "scheduled", consumerGroup = "scheduled")
- @Slf4j
- public class MyScheduledConsumer implements RocketMQListener
{ -
- @Override
- public void onMessage(Message message) {
- // 处理消息的逻辑
- log.info("Received scheduled message: {}", message);
- }
-
- }

日志显示时间存在误差,网络等通讯耗时可以忽略。
- @Resource
- private RocketMQTemplate rocketMqTemplate;
-
- @GetMapping("/send/batch/{messageBody}")
- public String sendBatchMsg(@PathVariable("messageBody") String messageBody) {
- List
> messageList = new ArrayList<>(); - for (int i = 0; i < 10; i++) {
- messageList.add(MessageBuilder.withPayload(messageBody + i).build());
- }
- SendResult sendResult = rocketMqTemplate.syncSend("batch", messageList);
- log.info("sendDelayedMsg msgId:{},sendStatus:{}", sendResult.getMsgId(), sendResult.getSendStatus());
- return "OK";
- }
- @Component
- @RocketMQMessageListener(topic = "batch", consumerGroup = "batch")
- @Slf4j
- public class MyBatchConsumer implements RocketMQListener
{ -
- @Override
- public void onMessage(String message) {
- // 处理消息的逻辑
- log.info("Received batch message: {}", message);
- }
-
- }

局部消息指的是消费者消费某个topic的某个队列中的消息是顺序的【队列扩容时操作部分数据乱序】。
- @Resource
- private RocketMQTemplate rocketMqTemplate;
-
- @GetMapping("/send/partOrder/{targetId}/{messageBody}")
- public String sendPartOrderMsg(@PathVariable("targetId") Integer targetId, @PathVariable("messageBody") String messageBody) {
- SendResult sendResult = rocketMqTemplate.syncSendOrderly("partOrder", (targetId + "_" + messageBody), "0");
- log.info("sendPartOrderMsg msgId:{},sendStatus:{}", sendResult.getMsgId(), sendResult.getSendStatus());
- return "OK";
- }
- @Component
- @RocketMQMessageListener(topic = "partOrder", consumerGroup = "partOrder", consumeMode = ConsumeMode.ORDERLY)
- @Slf4j
- public class MyPartOrderConsumer implements RocketMQListener
{ -
- @Override
- public void onMessage(String message) {
- // 处理消息的逻辑
- log.info("Received partOrder message: {}", message);
- }
-
- }
消费者消费全部消息都是顺序的,只能通过一个某个topic只有一个队列才能实现,这种应用场景较少,且性能较差。【向唯一队列中发送消息,队列无法扩展】。
- @Resource
- private RocketMQTemplate rocketMqTemplate;
-
- @GetMapping("/send/overAllOrder/{targetId}/{messageBody}")
- public String sendOverallOrderMsg(@PathVariable("targetId") Integer targetId, @PathVariable("messageBody") String messageBody) {
- SendResult sendResult = rocketMqTemplate.syncSendOrderly("overAllOrder", (targetId + "_" + messageBody), targetId.toString());
- log.info("sendOverallOrderMsg msgId:{},sendStatus:{}", sendResult.getMsgId(), sendResult.getSendStatus());
- return "OK";
- }
- @Component
- @RocketMQMessageListener(topic = "overallOrder", consumerGroup = "overallOrder", consumeMode = ConsumeMode.ORDERLY)
- @Slf4j
- public class MyOverallOrderConsumer implements RocketMQListener
{ -
- @Override
- public void onMessage(String message) {
- // 处理消息的逻辑
- log.info("Received overallOrder1 message: {}", message);
- }
-
- }
- @RestController
- @Slf4j
- public class TransactionProducerController {
-
- @Resource
- private RocketMQTemplate rocketMqTemplate;
-
- @GetMapping("/send/transaction/{targetId}/{messageBody}")
- public String sendTransactionMsg(@PathVariable("targetId") Integer targetId, @PathVariable("messageBody") String messageBody) {
- Message message = MessageBuilder.withPayload(messageBody).build();
- TransactionSendResult transaction = rocketMqTemplate.sendMessageInTransaction("transaction", message, targetId);
- return transaction.getTransactionId();
- }
-
- }
-
- @Slf4j
- @RocketMQTransactionListener
- public class TransactionMsgListener implements RocketMQLocalTransactionListener {
-
-
- /** 执行本地事务(在发送消息成功时执行) */
- @Override
- public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
- //模拟一个处理结果
- int index=2;
- /**
- * 模拟返回事务状态
- */
- switch (index){
- case 1:
- //处理业务
- String jsonStr = new String((byte[]) message.getPayload(), StandardCharsets.UTF_8);
- log.info("本地事务回滚,回滚消息,"+jsonStr);
- //返回ROLLBACK状态的消息会被丢弃
- return RocketMQLocalTransactionState.ROLLBACK;
- case 2:
- //返回UNKNOW状态的消息会等待Broker进行事务状态回查
- log.info("需要等待Broker进行事务状态回查");
- return RocketMQLocalTransactionState.UNKNOWN;
- default:
- log.info("事务提交,消息正常处理");
- //返回COMMIT状态的消息会立即被消费者消费到
- return RocketMQLocalTransactionState.COMMIT;
- }
- }
- /**
- * 检查本地事务的状态
- * 回查间隔时间:系统默认每隔60秒发起一次定时任务,对未提交的半事务消息进行回查,共持续12小时。
- * 第一次消息回查最快
- */
- @Override
- public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
- String transactionId = message.getHeaders().get("__transactionId__").toString();
- log.info("检查本地事务状态,transactionId:{}", transactionId);
- return RocketMQLocalTransactionState.COMMIT;
- }
- }
- @Component
- @RocketMQMessageListener(topic = "transaction", consumerGroup = "transaction")
- @Slf4j
- public class MyTransactionConsumer implements RocketMQListener
{ -
- @Override
- public void onMessage(String message) {
- // 处理消息的逻辑
- log.info("Received transaction message: {}", message);
- }
-
- }

单向发送只负责发送消息,不等待RocketMQ服务器返回的发送结果,也不提供回调函数来接收RocketMQ服务器的响应结果,只负责发送至于发送成功还是发送失败并不考虑。通常用于对可靠性要求不高的场景。
- @Resource
- private RocketMQTemplate rocketMqTemplate;
-
- @GetMapping("/send/oneWay/{messageBody}")
- public String sendOneWayMsg(@PathVariable("messageBody") String messageBody) {
- // 单向不可靠消息 void 方法无返回值
- rocketMqTemplate.sendOneWay("oneWay", messageBody);
- return "OK";
- }
- @Component
- @RocketMQMessageListener(topic = "oneWay", consumerGroup = "oneWay")
- @Slf4j
- public class MyOneWayConsumer implements RocketMQListener
{ -
- @Override
- public void onMessage(String message) {
- // 处理消息的逻辑
- log.info("Received oneWay message: {}", message);
- }
-
- }