• Springboot 集成 RocketMQ(进阶-消息)


    0. 入门篇

    Springboot 集成 RocketMq(入门)-CSDN博客

    本文主要使用RocketMQTemplate完成生成消息。

    1. 异步消息

    1.1 生产者

    1. @Resource
    2. private RocketMQTemplate rocketMqTemplate;
    3. @GetMapping("/send/async/{messageBody}")
    4. public String sendAsyncMsg(@PathVariable("messageBody") String messageBody) {
    5. rocketMqTemplate.asyncSend("async", messageBody, new SendCallback() {
    6. @Override
    7. public void onSuccess(SendResult sendResult) {
    8. log.info("sendAsyncMsg success messageBody:{}", messageBody);
    9. }
    10. @Override
    11. public void onException(Throwable throwable) {
    12. log.error("sendAsyncMsg fail messageBody:{}", messageBody);
    13. }
    14. });
    15. log.info("sendAsyncMsg operate oK");
    16. return "OK";
    17. }

    1.2 消费者

    1. @Component
    2. @RocketMQMessageListener(topic = "async", consumerGroup = "async")
    3. @Slf4j
    4. public class MyAsyncConsumer implements RocketMQListener {
    5. @Override
    6. public void onMessage(String message) {
    7. // 处理消息的逻辑
    8. log.info("Received async message: {}", message);
    9. }
    10. }

    通过日志可以发现,主线程结束线程日志先于异步日志打印完成。

    2.广播消息

    2.1 生产者

    1. @Resource
    2. private RocketMQTemplate rocketMqTemplate;
    3. @GetMapping("/send/broadcast/{messageBody}")
    4. public String sendBroadcastMsg(@PathVariable("messageBody") String messageBody) {
    5. // 单向不可靠消息 void 方法无返回值
    6. SendResult sendResult = rocketMqTemplate.syncSend("broadcast", messageBody);
    7. log.info("sendBroadcastMsg msgId:{},sendStatus:{}", sendResult.getMsgId(), sendResult.getSendStatus());
    8. return "OK";
    9. }

    2.2 消费者

    消费者监听注解上设置 messageModel = MessageModel.BROADCASTING,默认是(MessageModel.CLUSTERING)。

    1. @Component
    2. @RocketMQMessageListener(topic = "broadcast", consumerGroup = "broadcast1", messageModel = MessageModel.BROADCASTING )
    3. @Slf4j
    4. public class MyBroadcast1Consumer implements RocketMQListener {
    5. @Override
    6. public void onMessage(String message) {
    7. // 处理消息的逻辑
    8. log.info("MyBroadcast1Consumer Received broadcast message: {}", message);
    9. }
    10. }
    11. @Component
    12. @RocketMQMessageListener(topic = "broadcast", consumerGroup = "broadcast2", messageModel = MessageModel.BROADCASTING )
    13. @Slf4j
    14. public class MyBroadcast2Consumer implements RocketMQListener {
    15. @Override
    16. public void onMessage(String message) {
    17. // 处理消息的逻辑
    18. log.info("MyBroadcast2Consumer Received broadcast message: {}", message);
    19. }
    20. }

    3. 延时、定时消息

    3.1 延时消息

    3.1.1 生产者

    1. @Resource
    2. private RocketMQTemplate rocketMqTemplate;
    3. @GetMapping("/send/delayed/{messageBody}")
    4. public String sendDelayedMsg(@PathVariable("messageBody") String messageBody) {
    5. // 延时10秒发送
    6. SendResult sendResult = rocketMqTemplate.syncSendDelayTimeSeconds("delayed", messageBody, 10L);
    7. log.info("sendDelayedMsg msgId:{},sendStatus:{}", sendResult.getMsgId(), sendResult.getSendStatus());
    8. return "OK";
    9. }

    3.1.2 消费者

    1. @Component
    2. @RocketMQMessageListener(topic = "delayed", consumerGroup = "delayed")
    3. @Slf4j
    4. public class MyDelayedConsumer implements RocketMQListener {
    5. @Override
    6. public void onMessage(Message message) {
    7. // 处理消息的逻辑
    8. log.info("Received delayed message: {}", message);
    9. }
    10. }

    3.2 定时消息

    3.2.1 生产者

    message.setDeliverTimeMs(System.currentTimeMillis() + 10);

    1. @Resource
    2. private RocketMQTemplate rocketMqTemplate;
    3. @GetMapping("/send/scheduled/{messageBody}")
    4. public String sendScheduledMsg(@PathVariable("messageBody") String messageBody) {
    5. // 指定时间发送 (当前时间 + 30秒)
    6. SendResult sendResult = rocketMqTemplate.syncSendDeliverTimeMills("scheduled", messageBody, (System.currentTimeMillis() + (30L * 1000L)));
    7. log.info("sendDelayedMsg msgId:{},sendStatus:{}", sendResult.getMsgId(), sendResult.getSendStatus());
    8. return "OK";
    9. }

    3.2.2 消费者

    1. @Component
    2. @RocketMQMessageListener(topic = "scheduled", consumerGroup = "scheduled")
    3. @Slf4j
    4. public class MyScheduledConsumer implements RocketMQListener {
    5. @Override
    6. public void onMessage(Message message) {
    7. // 处理消息的逻辑
    8. log.info("Received scheduled message: {}", message);
    9. }
    10. }

    日志显示时间存在误差,网络等通讯耗时可以忽略。

    4.批量消息

    4.1 生产者

    1. @Resource
    2. private RocketMQTemplate rocketMqTemplate;
    3. @GetMapping("/send/batch/{messageBody}")
    4. public String sendBatchMsg(@PathVariable("messageBody") String messageBody) {
    5. List> messageList = new ArrayList<>();
    6. for (int i = 0; i < 10; i++) {
    7. messageList.add(MessageBuilder.withPayload(messageBody + i).build());
    8. }
    9. SendResult sendResult = rocketMqTemplate.syncSend("batch", messageList);
    10. log.info("sendDelayedMsg msgId:{},sendStatus:{}", sendResult.getMsgId(), sendResult.getSendStatus());
    11. return "OK";
    12. }

    4.2 消费者

    1. @Component
    2. @RocketMQMessageListener(topic = "batch", consumerGroup = "batch")
    3. @Slf4j
    4. public class MyBatchConsumer implements RocketMQListener {
    5. @Override
    6. public void onMessage(String message) {
    7. // 处理消息的逻辑
    8. log.info("Received batch message: {}", message);
    9. }
    10. }

    5.顺序消息

    5.1 局部有序

    局部消息指的是消费者消费某个topic的某个队列中的消息是顺序的【队列扩容时操作部分数据乱序】。

    5.1.1 生产者

    1. @Resource
    2. private RocketMQTemplate rocketMqTemplate;
    3. @GetMapping("/send/partOrder/{targetId}/{messageBody}")
    4. public String sendPartOrderMsg(@PathVariable("targetId") Integer targetId, @PathVariable("messageBody") String messageBody) {
    5. SendResult sendResult = rocketMqTemplate.syncSendOrderly("partOrder", (targetId + "_" + messageBody), "0");
    6. log.info("sendPartOrderMsg msgId:{},sendStatus:{}", sendResult.getMsgId(), sendResult.getSendStatus());
    7. return "OK";
    8. }

    5.1.2 消费者

    1. @Component
    2. @RocketMQMessageListener(topic = "partOrder", consumerGroup = "partOrder", consumeMode = ConsumeMode.ORDERLY)
    3. @Slf4j
    4. public class MyPartOrderConsumer implements RocketMQListener {
    5. @Override
    6. public void onMessage(String message) {
    7. // 处理消息的逻辑
    8. log.info("Received partOrder message: {}", message);
    9. }
    10. }

    5.2 全局有序

    消费者消费全部消息都是顺序的,只能通过一个某个topic只有一个队列才能实现,这种应用场景较少,且性能较差。【向唯一队列中发送消息,队列无法扩展】。

    5.2.1 生产者

    1. @Resource
    2. private RocketMQTemplate rocketMqTemplate;
    3. @GetMapping("/send/overAllOrder/{targetId}/{messageBody}")
    4. public String sendOverallOrderMsg(@PathVariable("targetId") Integer targetId, @PathVariable("messageBody") String messageBody) {
    5. SendResult sendResult = rocketMqTemplate.syncSendOrderly("overAllOrder", (targetId + "_" + messageBody), targetId.toString());
    6. log.info("sendOverallOrderMsg msgId:{},sendStatus:{}", sendResult.getMsgId(), sendResult.getSendStatus());
    7. return "OK";
    8. }

    5.2.2 消费者

    1. @Component
    2. @RocketMQMessageListener(topic = "overallOrder", consumerGroup = "overallOrder", consumeMode = ConsumeMode.ORDERLY)
    3. @Slf4j
    4. public class MyOverallOrderConsumer implements RocketMQListener {
    5. @Override
    6. public void onMessage(String message) {
    7. // 处理消息的逻辑
    8. log.info("Received overallOrder1 message: {}", message);
    9. }
    10. }

    6.事务消息

    6.1 生产者

    1. @RestController
    2. @Slf4j
    3. public class TransactionProducerController {
    4. @Resource
    5. private RocketMQTemplate rocketMqTemplate;
    6. @GetMapping("/send/transaction/{targetId}/{messageBody}")
    7. public String sendTransactionMsg(@PathVariable("targetId") Integer targetId, @PathVariable("messageBody") String messageBody) {
    8. Message message = MessageBuilder.withPayload(messageBody).build();
    9. TransactionSendResult transaction = rocketMqTemplate.sendMessageInTransaction("transaction", message, targetId);
    10. return transaction.getTransactionId();
    11. }
    12. }
    13. @Slf4j
    14. @RocketMQTransactionListener
    15. public class TransactionMsgListener implements RocketMQLocalTransactionListener {
    16. /** 执行本地事务(在发送消息成功时执行) */
    17. @Override
    18. public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
    19. //模拟一个处理结果
    20. int index=2;
    21. /**
    22. * 模拟返回事务状态
    23. */
    24. switch (index){
    25. case 1:
    26. //处理业务
    27. String jsonStr = new String((byte[]) message.getPayload(), StandardCharsets.UTF_8);
    28. log.info("本地事务回滚,回滚消息,"+jsonStr);
    29. //返回ROLLBACK状态的消息会被丢弃
    30. return RocketMQLocalTransactionState.ROLLBACK;
    31. case 2:
    32. //返回UNKNOW状态的消息会等待Broker进行事务状态回查
    33. log.info("需要等待Broker进行事务状态回查");
    34. return RocketMQLocalTransactionState.UNKNOWN;
    35. default:
    36. log.info("事务提交,消息正常处理");
    37. //返回COMMIT状态的消息会立即被消费者消费到
    38. return RocketMQLocalTransactionState.COMMIT;
    39. }
    40. }
    41. /**
    42. * 检查本地事务的状态
    43. * 回查间隔时间:系统默认每隔60秒发起一次定时任务,对未提交的半事务消息进行回查,共持续12小时。
    44. * 第一次消息回查最快
    45. */
    46. @Override
    47. public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
    48. String transactionId = message.getHeaders().get("__transactionId__").toString();
    49. log.info("检查本地事务状态,transactionId:{}", transactionId);
    50. return RocketMQLocalTransactionState.COMMIT;
    51. }
    52. }

    6.2 消费者

    1. @Component
    2. @RocketMQMessageListener(topic = "transaction", consumerGroup = "transaction")
    3. @Slf4j
    4. public class MyTransactionConsumer implements RocketMQListener {
    5. @Override
    6. public void onMessage(String message) {
    7. // 处理消息的逻辑
    8. log.info("Received transaction message: {}", message);
    9. }
    10. }

    7.单项消息

    单向发送只负责发送消息,不等待RocketMQ服务器返回的发送结果,也不提供回调函数来接收RocketMQ服务器的响应结果,只负责发送至于发送成功还是发送失败并不考虑。通常用于对可靠性要求不高的场景。

    7.1 生产者

    1. @Resource
    2. private RocketMQTemplate rocketMqTemplate;
    3. @GetMapping("/send/oneWay/{messageBody}")
    4. public String sendOneWayMsg(@PathVariable("messageBody") String messageBody) {
    5. // 单向不可靠消息 void 方法无返回值
    6. rocketMqTemplate.sendOneWay("oneWay", messageBody);
    7. return "OK";
    8. }

    7.2 消费者

    1. @Component
    2. @RocketMQMessageListener(topic = "oneWay", consumerGroup = "oneWay")
    3. @Slf4j
    4. public class MyOneWayConsumer implements RocketMQListener {
    5. @Override
    6. public void onMessage(String message) {
    7. // 处理消息的逻辑
    8. log.info("Received oneWay message: {}", message);
    9. }
    10. }

  • 相关阅读:
    [机缘参悟-66]:怎样才能让别人愿意帮你:利益共享法则、“大道”、“人性”
    三菱PLC FX3U脉冲轴点动功能块(MC_Jog)
    攻防世界题目练习——Crypto密码新手+引导模式(二)(持续更新)
    读书记录 《就算生得暗淡,也要活得光彩》
    LeetCode刷题系列 -- 78. 子集
    TDengine3.0:解决高基数问题的时序数据库设计思路
    nodejs+vue网络课程在线考试系统an7ib
    5.Vue-在Vue框架中实现Vue的增删改查
    网络摄像头(IPC)介绍:类型、供电、镜头、夜视等
    最新版手机软件App下载排行网站源码/App应用商店源码
  • 原文地址:https://blog.csdn.net/qq_34253002/article/details/134243344