目录
我们知道RocketMQ主要提供了两种消费模式:集群消费以及广播消费,默认的是集群模式,而他具体的消息模式是由消费者来订阅的,可是他究竟什么情况下能收到消息,什么时候不能收到消息,网上也没有明确的说明,只有一句模糊的话:集群模式是指 RocketMQ 中的一条消息只能被同一个消费者组中的一个消费者消费;默认是集群模式;
如下图,Producer 向 TopicTest 这个 Topic 并发写入 3 条新消息,分别被分配到了 MessageQueue1~MessageQueue3 这 3 个队列,然后 Group 中的三个 Consumer 分别消费了一条消息:
广播模式是 RocketMQ 中的消息会被消费组中的每个消费者都消费一次,如下图:
看概念总是让人有点蒙,下面让我们来试试,看看究竟什么情况下能接收到,什么情况下不能接收到。
生产者关键代码示例:
- /**
- * @description:
- * @author:
- * @date: 2022/11/3 20:30
- */
- @Component
- public class DataSyncSender {
-
- private final Logger logger = LoggerFactory.getLogger(DataSyncSender.class);
- @Resource
- private RocketMQTemplate rocketMQTemplate;
-
-
- public void syncSysMenuMessage(String message) {
- try {
- SendResult sendResult = rocketMQTemplate.syncSend("a", MessageBuilder.withPayload(message).build());
- logger.info("同步发送消息sendResult:{}", JSON.toJSONString(sendResult));
- } catch (Exception e) {
- logger.error("同步发送消息error:{}" ,e.getMessage());
- }
- }
-
- }
概念:集群模式 RocketMQ 中的一条消息只能被同一个消费者组中的一个消费者消费
1、消费者 topic=a,consumerGroup=b,这个代码你发两台服务器,他只能在一台服务器上消费一次;
结论:一个消费组中同一个topic,只能消费一次;
2、两个消费者,topic一样,consumerGroup不一样,跑起来,发现每个都能接收到消息
结论:不同消费组中使用同一个topic,两个消费者都能消费到;
消息者代码示例:
- /**
- * @description:
- * @author:
- * @date: 2022/11/3 20:30
- */
- @Slf4j
- @Service
- @RocketMQMessageListener(topic = "a", consumerGroup = "TABLE_INFO_MESSAGE_1")
- public class DataSyncReceive implements RocketMQListener
{ -
- private final Logger logger = LoggerFactory.getLogger(DataSyncReceive.class);
-
-
- /**
- *
- * @param message
- */
- @Override
- public void onMessage(String message) {
- logger.info("接收信息组1 message:{}", message);
-
- }
- }
-
-
- /**
- * @description:
- * @author:
- * @date: 2022/11/3 20:30
- */
- @Slf4j
- @Service
- @RocketMQMessageListener(topic = "a", consumerGroup = "TABLE_INFO_MESSAGE_2")
- public class DataSyncReceive2 implements RocketMQListener
{ -
- private final Logger logger = LoggerFactory.getLogger(DataSyncReceive.class);
-
-
- /**
- *
- * @param message
- */
- @Override
- public void onMessage(String message) {
- logger.info("接收信息组1 message:{}", message);
-
- }
- }
rocketmq消息模式是在消息者来定义的,加一个 messageModel= MessageModel.BROADCASTING
@RocketMQMessageListener(topic = "a", consumerGroup = "TABLE_INFO_MESSAGE_GROUP",messageModel= MessageModel.BROADCASTING)
1、启动两个监听,他们的 topic 、consumerGroup 分别都一样,两个都可以正常接收数据;
2、启动两个监听,他们的 topic 一样、consumerGroup不一样,两个都可以正常接收数据;
综合来看,广播模式功能还是非常强大的;
但是如果多个消费者,有的定义为集群模式,有的人定义为广播模式,结果又会如何呢?
经过验证:两个消费者都能收到全部的topic消息;