• rocketmq消息发送能不能接收,具体什么情况下能接收,代码演示答疑解惑


    目录

             一、前言

    二、集群模式实践

    三、广播模式实践


    一、前言

    我们知道RocketMQ主要提供了两种消费模式:集群消费以及广播消费,默认的是集群模式,而他具体的消息模式是由消费者来订阅的,可是他究竟什么情况下能收到消息,什么时候不能收到消息,网上也没有明确的说明,只有一句模糊的话:集群模式是指 RocketMQ 中的一条消息只能被同一个消费者组中的一个消费者消费;默认是集群模式;

    如下图,Producer 向 TopicTest 这个 Topic 并发写入 3 条新消息,分别被分配到了 MessageQueue1~MessageQueue3 这 3 个队列,然后 Group 中的三个 Consumer 分别消费了一条消息:
     

    广播模式是 RocketMQ 中的消息会被消费组中的每个消费者都消费一次,如下图:
     

    看概念总是让人有点蒙,下面让我们来试试,看看究竟什么情况下能接收到,什么情况下不能接收到。

    生产者关键代码示例:

    1. /**
    2. * @description:
    3. * @author:
    4. * @date: 2022/11/3 20:30
    5. */
    6. @Component
    7. public class DataSyncSender {
    8. private final Logger logger = LoggerFactory.getLogger(DataSyncSender.class);
    9. @Resource
    10. private RocketMQTemplate rocketMQTemplate;
    11. public void syncSysMenuMessage(String message) {
    12. try {
    13. SendResult sendResult = rocketMQTemplate.syncSend("a", MessageBuilder.withPayload(message).build());
    14. logger.info("同步发送消息sendResult:{}", JSON.toJSONString(sendResult));
    15. } catch (Exception e) {
    16. logger.error("同步发送消息error:{}" ,e.getMessage());
    17. }
    18. }
    19. }

    二、集群模式实践

    概念:集群模式 RocketMQ 中的一条消息只能被同一个消费者组中的一个消费者消费

    1、消费者 topic=a,consumerGroup=b,这个代码你发两台服务器,他只能在一台服务器上消费一次;

    结论:一个消费组中同一个topic,只能消费一次

    2、两个消费者,topic一样,consumerGroup不一样,跑起来,发现每个都能接收到消息

    结论:不同消费组中使用同一个topic,两个消费者都能消费到

    消息者代码示例:

    1. /**
    2. * @description:
    3. * @author:
    4. * @date: 2022/11/3 20:30
    5. */
    6. @Slf4j
    7. @Service
    8. @RocketMQMessageListener(topic = "a", consumerGroup = "TABLE_INFO_MESSAGE_1")
    9. public class DataSyncReceive implements RocketMQListener {
    10. private final Logger logger = LoggerFactory.getLogger(DataSyncReceive.class);
    11. /**
    12. *
    13. * @param message
    14. */
    15. @Override
    16. public void onMessage(String message) {
    17. logger.info("接收信息组1 message:{}", message);
    18. }
    19. }
    20. /**
    21. * @description:
    22. * @author:
    23. * @date: 2022/11/3 20:30
    24. */
    25. @Slf4j
    26. @Service
    27. @RocketMQMessageListener(topic = "a", consumerGroup = "TABLE_INFO_MESSAGE_2")
    28. public class DataSyncReceive2 implements RocketMQListener {
    29. private final Logger logger = LoggerFactory.getLogger(DataSyncReceive.class);
    30. /**
    31. *
    32. * @param message
    33. */
    34. @Override
    35. public void onMessage(String message) {
    36. logger.info("接收信息组1 message:{}", message);
    37. }
    38. }

    三、广播模式实践

    rocketmq消息模式是在消息者来定义的,加一个 messageModel= MessageModel.BROADCASTING

    @RocketMQMessageListener(topic = "a", consumerGroup = "TABLE_INFO_MESSAGE_GROUP",messageModel= MessageModel.BROADCASTING)

    1、启动两个监听他们的 topic 、consumerGroup 分别都一样,两个都可以正常接收数据;

    2、启动两个监听他们的 topic 一样、consumerGroup不一样,两个都可以正常接收数据;

    综合来看,广播模式功能还是非常强大的;

    但是如果多个消费者,有的定义为集群模式,有的人定义为广播模式,结果又会如何呢?

    经过验证:两个消费者都能收到全部的topic消息;

  • 相关阅读:
    node+vue3+mysql前后分离开发范式——实现视频文件上传并渲染
    LightDB中的存储过程(六)
    【网站项目】“最多跑一次”小程序
    第十三章第三节:Java数据结构预备知识之泛型
    艾美捷内毒素检测试剂盒参数和化学性质说明
    嵌入式分享合集26
    大数据Flink(一百):SQL自定义函数(UDF)和标量函数(Scalar Function)
    长路漫漫、技术作伴
    Linux篇16进程信号第一部分
    外汇市场如何监管?
  • 原文地址:https://blog.csdn.net/qq_43384381/article/details/127678671