RocketMQ 早已提供了一组最佳实践,但工作在一线的伙伴却很少知道,项目中的各种随性代码经常导致消息错乱问题,严重影响业务的准确性。为了保障最佳实践的落地,降低一线伙伴的使用成本,统一 MQ 使用规范,需要对其进行抽象和封装…
RocketMQ的最佳实践中推荐:一个应用尽可能用一个Topic,消息子类型用tags来标识,tags可以由应用自由设置。
在使用rocketMQTemplate发送消息时,通过设置发送方法的destination参数来设置消息的目的地,destination的格式为topicName:tagName,:前面表示topic的名称,后面表示tags名称,简单示例如下:
- // 计算 destination
- protected String createDestination(String topic, String tag) {
- if (org.apache.commons.lang3.StringUtils.isNotEmpty(tag)){
- return topic + ":" + tag;
- }else {
- return topic;
- }
- }
- // 发送信息
- String destination = createDestination(topic, tag);
- SendResult sendResult = this.rocketMQTemplate.syncSendOrderly(destination, msg, shardingKey, 2000);
tags从命名来看像是一个复数,但发送消息时,目的地只能指定一个topic下的一个tag,不能指定多个。
但,在消费消息时,就变的没那么方便了,简单示例如下:
- @Service
- @RocketMQMessageListener(
- topic = "consumer-test-topic-1",
- consumerGroup ="user-message-consumer-1",
- selectorExpression = "*",
- consumeMode = ConsumeMode.ORDERLY
- )
- @Slf4j
- public class RocketBasedUserMessageConsumer extends UserMessageConsumer
- implements RocketMQListener
{ - @Override
- public void onMessage(MessageExt message) {
- String tag = message.getTags();
- byte[] body = message.getBody();
- log.info("handle msg body {}", new String(body));
- switch (tag){
- case "UserCreatedEvent":
- UserEvents.UserCreatedEvent createdEvent = JSON.parseObject(body, UserEvents.UserCreatedEvent.class);
- handle(createdEvent);
- return;
- case "UserEnableEvent":
- UserEvents.UserEnableEvent enableEvent = JSON.parseObject(body, UserEvents.UserEnableEvent.class);
- handle(enableEvent);
- return;
- case "UserDisableEvent":
- UserEvents.UserDisableEvent disableEvent = JSON.parseObject(body, UserEvents.UserDisableEvent.class);
- handle(disableEvent);
- return;
- case "UserDeletedEvent":
- UserEvents.UserDeletedEvent deletedEvent = JSON.parseObject(body, UserEvents.UserDeletedEvent.class);
- handle(deletedEvent);
- return;
- }
- }
- }
该方法有几个问题:
提供一种面向业务场景的,灵活进行业务扩展的模式,具有以下特征:
框架依赖
rocketmq-spring-boot-starter 完成消息发送和回收。
首先,增加 rocketmq 相关依赖。
- <dependency>
- <groupId>org.apache.rocketmq</groupId>
- <artifactId>rocketmq-spring-boot-starter</artifactId>
- <version>2.2.1</version>
- </dependency>
然后,增加 lego starter。
- <dependency>
- <groupId>com.geekhalo.lego</groupId>
- <artifactId>lego-starter</artifactId>
- <version>0.1.13-tag_based_dispatcher_message_consumer-SNAPSHOT</version>
- </dependency>
在 application.yml 文件中增加 rocketmq 配置。
- rocketmq:
- name-server: http://127.0.0.1:9876
- producer:
- group: rocket-demo
定义消费者,只需:
示例如下:
- @TagBasedDispatcherMessageConsumer(
- topic = "consumer-test-topic",
- consumer = "user-message-consumer"
- )
- public class UserMessageConsumer {
- private final Map<Long, List<UserEvents.UserEvent>> events = Maps.newHashMap();
- public void clean(){
- this.events.clear();;
- }
- public List<UserEvents.UserEvent> getUserEvents(Long userId){
- return this.events.get(userId);
- }
- @HandleTag("UserCreatedEvent")
- public void handle(UserEvents.UserCreatedEvent userCreatedEvent){
- List<UserEvents.UserEvent> userEvents = this.events.computeIfAbsent(userCreatedEvent.getUserId(), userId -> new ArrayList<>());
- userEvents.add(userCreatedEvent);
- }
- @HandleTag("UserEnableEvent")
- public void handle(UserEvents.UserEnableEvent userEnableEvent){
- List<UserEvents.UserEvent> userEvents = this.events.computeIfAbsent(userEnableEvent.getUserId(), userId -> new ArrayList<>());
- userEvents.add(userEnableEvent);
- }
- @HandleTag("UserDisableEvent")
- public void handle(UserEvents.UserDisableEvent userDisableEvent){
- List<UserEvents.UserEvent> userEvents = this.events.computeIfAbsent(userDisableEvent.getUserId(), userId -> new ArrayList<>());
- userEvents.add(userDisableEvent);
- }
- @HandleTag("UserDeletedEvent")
- public void handle(UserEvents.UserDeletedEvent userDeletedEvent){
- List<UserEvents.UserEvent> userEvents = this.events.computeIfAbsent(userDeletedEvent.getUserId(), userId -> new ArrayList<>());
- userEvents.add(userDeletedEvent);
- }
- }
编写测试用例如下:
- @SpringBootTest(classes = DemoApplication.class)
- @Slf4j
- class UserMessageConsumerTest {
- @Autowired
- private UserMessageConsumer userMessageConsumer;
- @Autowired
- private RocketMQTemplate rocketMQTemplate;
- private List<Long> userIds;
- @BeforeEach
- void setUp() throws InterruptedException {
- this.userMessageConsumer.clean();
- this.userIds = new ArrayList<>();
- for (int i = 0; i< 100; i++){
- userIds.add(10000L + i);
- }
- this.userIds.forEach(userId -> sendMessage(userId));
- TimeUnit.SECONDS.sleep(3);
- }
- private void sendMessage(Long userId) {
- String topic = "consumer-test-topic";
- {
- String tag = "UserCreatedEvent";
- UserEvents.UserCreatedEvent userCreatedEvent = new UserEvents.UserCreatedEvent();
- userCreatedEvent.setUserId(userId);
- userCreatedEvent.setUserName("Name-" + userId);
- sendOrderlyMessage(topic, tag, userCreatedEvent);
- }
- {
- String tag = "UserEnableEvent";
- UserEvents.UserEnableEvent userEnableEvent = new UserEvents.UserEnableEvent();
- userEnableEvent.setUserId(userId);
- userEnableEvent.setUserName("Name-" + userId);
- sendOrderlyMessage(topic, tag, userEnableEvent);
- }
- {
- String tag = "UserDisableEvent";
- UserEvents.UserDisableEvent userDisableEvent = new UserEvents.UserDisableEvent();
- userDisableEvent.setUserId(userId);
- userDisableEvent.setUserName("Name-" + userId);
- sendOrderlyMessage(topic, tag, userDisableEvent);
- }
- {
- String tag = "UserDeletedEvent";
- UserEvents.UserDeletedEvent userDeletedEvent = new UserEvents.UserDeletedEvent();
- userDeletedEvent.setUserId(userId);
- userDeletedEvent.setUserName("Name-" + userId);
- sendOrderlyMessage(topic, tag, userDeletedEvent);
- }
- }
- private void sendOrderlyMessage(String topic, String tag, UserEvents.UserEvent event) {
- String shardingKey = String.valueOf(event.getUserId());
- String json = JSON.toJSONString(event);
- Message<String> msg = MessageBuilder
- .withPayload(json)
- .build();
- String destination = createDestination(topic, tag);
- SendResult sendResult = this.rocketMQTemplate.syncSendOrderly(destination, msg, shardingKey, 2000);
- log.info("Send result is {} for msg", sendResult, msg);
- }
- protected String createDestination(String topic, String tag) {
- if (org.apache.commons.lang3.StringUtils.isNotEmpty(tag)){
- return topic + ":" + tag;
- }else {
- return topic;
- }
- }
- @AfterEach
- void tearDown() {
- }
- @Test
- void getUserEvents() {
- this.userIds.forEach(userId ->{
- List<UserEvents.UserEvent> userEvents = this.userMessageConsumer.getUserEvents(userId);
- Assertions.assertEquals(4, userEvents.size());
- Assertions.assertTrue(userEvents.get(0) instanceof UserEvents.UserCreatedEvent);
- Assertions.assertTrue(userEvents.get(1) instanceof UserEvents.UserEnableEvent);
- Assertions.assertTrue(userEvents.get(2) instanceof UserEvents.UserDisableEvent);
- Assertions.assertTrue(userEvents.get(3) instanceof UserEvents.UserDeletedEvent);
- });
- }
- }
启动时,可以看到如下日志:
TagBasedDispatcherConsumerContainer : success to subscribe http://127.0.0.1:9876, topic consumer-test-topic, tag UserCreatedEvent||UserEnableEvent||UserDeletedEvent||UserDisableEvent, group user-message-consumer
从日志上可以看出,框架以组 group user-message-consumer 创建 Consumer,并订阅 consumer-test-topic 的 UserCreatedEvent||UserEnableEvent||UserDeletedEvent||UserDisableEvent 等 Tag,初始化流程符合预期。
测试逻辑比较简单,逻辑如下:
观察日志,可以看到发送和消费日志交替出现:
- UserMessageConsumerTest : Send result is SendResult [sendStatus=SEND_OK, msgId=2408820718EADE005827F0B9E9D4D6D9B98158644D467D38DE4900FD, offsetMsgId=C0A8010A00002A9F00000000056077FB, messageQueue=MessageQueue [topic=consumer-test-topic, brokerName=bogon, queueId=2], queueOffset=1121] for msg
- TagBasedDispatcherConsumerContainer : consume 2408820718EADE005827F0B9E9D4D6D9B98158644D467D38DE4700FC cost: 0 ms
用例通过,运行结果符合预期。
image
框架初始化流程如下:

image
运行流程如下: