• 仅此一招,再无消息乱序的烦恼


    1. 概览

    RocketMQ 早已提供了一组最佳实践,但工作在一线的伙伴却很少知道,项目中的各种随性代码经常导致消息错乱问题,严重影响业务的准确性。为了保障最佳实践的落地,降低一线伙伴的使用成本,统一 MQ 使用规范,需要对其进行抽象和封装…

    1.1. 背景

    RocketMQ的最佳实践中推荐:一个应用尽可能用一个Topic,消息子类型用tags来标识,tags可以由应用自由设置。

    在使用rocketMQTemplate发送消息时,通过设置发送方法的destination参数来设置消息的目的地,destination的格式为topicName:tagName,:前面表示topic的名称,后面表示tags名称,简单示例如下:

    1. // 计算 destination
    2. protected String createDestination(String topic, String tag) {
    3.     if (org.apache.commons.lang3.StringUtils.isNotEmpty(tag)){
    4.         return topic + ":" + tag;
    5.     }else {
    6.         return topic;
    7.     }
    8. }
    9. // 发送信息
    10. String destination = createDestination(topic, tag);
    11. SendResult sendResult = this.rocketMQTemplate.syncSendOrderly(destination, msg, shardingKey, 2000);

    tags从命名来看像是一个复数,但发送消息时,目的地只能指定一个topic下的一个tag,不能指定多个。

    但,在消费消息时,就变的没那么方便了,简单示例如下:

    1. @Service
    2. @RocketMQMessageListener(
    3.     topic = "consumer-test-topic-1",
    4.         consumerGroup ="user-message-consumer-1",
    5.         selectorExpression = "*",
    6.         consumeMode = ConsumeMode.ORDERLY
    7. )
    8. @Slf4j
    9. public class RocketBasedUserMessageConsumer extends UserMessageConsumer
    10.     implements RocketMQListener {
    11.     @Override
    12.     public void onMessage(MessageExt message) {
    13.         String tag = message.getTags();
    14.         byte[] body = message.getBody();
    15.         log.info("handle msg body {}"new String(body));
    16.         switch (tag){
    17.             case "UserCreatedEvent":
    18.                 UserEvents.UserCreatedEvent createdEvent = JSON.parseObject(body, UserEvents.UserCreatedEvent.class);
    19.                 handle(createdEvent);
    20.                 return;
    21.             case "UserEnableEvent":
    22.                 UserEvents.UserEnableEvent enableEvent = JSON.parseObject(body, UserEvents.UserEnableEvent.class);
    23.                 handle(enableEvent);
    24.                 return;
    25.             case "UserDisableEvent":
    26.                 UserEvents.UserDisableEvent disableEvent = JSON.parseObject(body, UserEvents.UserDisableEvent.class);
    27.                 handle(disableEvent);
    28.                 return;
    29.             case "UserDeletedEvent":
    30.                 UserEvents.UserDeletedEvent deletedEvent = JSON.parseObject(body, UserEvents.UserDeletedEvent.class);
    31.                 handle(deletedEvent);
    32.                 return;
    33.         }
    34.     }
    35. }

    该方法有几个问题:

    1. tag 维护成本较高,RocketMQMessageListener 设置 selectorExpression 为 *,将拉取全部数据,增加通讯成本;如果使用 tag1 || tag2 方式,每次调整都需要对代码和配置进行更新,特别容易遗漏;
    2. 充斥大量模板代码,比如 case 分支,反序列化,调用业务方法等;
    3. API 具有侵入性,开发是需要关心 RocketMQ API,存在一定学习成本;

    1.2. 目标

    提供一种面向业务场景的,灵活进行业务扩展的模式,具有以下特征:

    1. Tag 和代码保持一致,不需要多处配置,新增逻辑自动完成 Tag 注册;
    2. 消除模板方法,类中只保留核心业务方法,框架完成 方法分发、消息反序列化等操作;
    3. 代码零侵入,仅使用注解,无需了解 RocketMQ API;

    2. 快速入门

    框架依赖
    rocketmq-spring-boot-starter 完成消息发送和回收。

    2.1. 环境准备

    2.1.1. 增加依赖

    首先,增加 rocketmq 相关依赖。

    1. <dependency>
    2.     <groupId>org.apache.rocketmq</groupId>
    3.     <artifactId>rocketmq-spring-boot-starter</artifactId>
    4.     <version>2.2.1</version>
    5. </dependency>

    然后,增加 lego starter。

    1. <dependency>
    2.     <groupId>com.geekhalo.lego</groupId>
    3.     <artifactId>lego-starter</artifactId>
    4.     <version>0.1.13-tag_based_dispatcher_message_consumer-SNAPSHOT</version>
    5. </dependency>

    2.1.2. 增加配置

    在 application.yml 文件中增加 rocketmq 配置。

    1. rocketmq:
    2.   name-server: http://127.0.0.1:9876
    3.   producer:
    4.     group: rocket-demo

    2.2. 定义消费者

    定义消费者,只需:

    1. 在 Bean 上增加 @TagBasedDispatcherMessageConsumer 注解,并指定 topic 和 consumer
    2. 在 Bean 的方法上添加 @HandleTag 注解,并指定监听的 tag

    示例如下:

    1. @TagBasedDispatcherMessageConsumer(
    2.         topic = "consumer-test-topic",
    3.         consumer = "user-message-consumer"
    4. )
    5. public class UserMessageConsumer {
    6.     private final Map<LongList<UserEvents.UserEvent>> events = Maps.newHashMap();
    7.     public void clean(){
    8.         this.events.clear();;
    9.     }
    10.     public List<UserEvents.UserEventgetUserEvents(Long userId){
    11.         return this.events.get(userId);
    12.     }
    13.     @HandleTag("UserCreatedEvent")
    14.     public void handle(UserEvents.UserCreatedEvent userCreatedEvent){
    15.         List<UserEvents.UserEvent> userEvents = this.events.computeIfAbsent(userCreatedEvent.getUserId(), userId -> new ArrayList<>());
    16.         userEvents.add(userCreatedEvent);
    17.     }
    18.     @HandleTag("UserEnableEvent")
    19.     public void handle(UserEvents.UserEnableEvent userEnableEvent){
    20.         List<UserEvents.UserEvent> userEvents = this.events.computeIfAbsent(userEnableEvent.getUserId(), userId -> new ArrayList<>());
    21.         userEvents.add(userEnableEvent);
    22.     }
    23.     @HandleTag("UserDisableEvent")
    24.     public void handle(UserEvents.UserDisableEvent userDisableEvent){
    25.         List<UserEvents.UserEvent> userEvents = this.events.computeIfAbsent(userDisableEvent.getUserId(), userId -> new ArrayList<>());
    26.         userEvents.add(userDisableEvent);
    27.     }
    28.     @HandleTag("UserDeletedEvent")
    29.     public void handle(UserEvents.UserDeletedEvent userDeletedEvent){
    30.         List<UserEvents.UserEvent> userEvents = this.events.computeIfAbsent(userDeletedEvent.getUserId(), userId -> new ArrayList<>());
    31.         userEvents.add(userDeletedEvent);
    32.     }
    33. }

    2.3. 测试

    编写测试用例如下:

    1. @SpringBootTest(classes = DemoApplication.class)
    2. @Slf4j
    3. class UserMessageConsumerTest {
    4.     @Autowired
    5.     private UserMessageConsumer userMessageConsumer;
    6.     @Autowired
    7.     private RocketMQTemplate rocketMQTemplate;
    8.     private List<Long> userIds;
    9.     @BeforeEach
    10.     void setUp() throws InterruptedException {
    11.         this.userMessageConsumer.clean();
    12.         this.userIds = new ArrayList<>();
    13.         for (int i = 0; i< 100; i++){
    14.             userIds.add(10000+ i);
    15.         }
    16.         this.userIds.forEach(userId -> sendMessage(userId));
    17.         TimeUnit.SECONDS.sleep(3);
    18.     }
    19.     private void sendMessage(Long userId) {
    20.         String topic = "consumer-test-topic";
    21.         {
    22.             String tag = "UserCreatedEvent";
    23.             UserEvents.UserCreatedEvent userCreatedEvent = new UserEvents.UserCreatedEvent();
    24.             userCreatedEvent.setUserId(userId);
    25.             userCreatedEvent.setUserName("Name-" + userId);
    26.             sendOrderlyMessage(topic, tag, userCreatedEvent);
    27.         }
    28.         {
    29.             String tag = "UserEnableEvent";
    30.             UserEvents.UserEnableEvent userEnableEvent = new UserEvents.UserEnableEvent();
    31.             userEnableEvent.setUserId(userId);
    32.             userEnableEvent.setUserName("Name-" + userId);
    33.             sendOrderlyMessage(topic, tag, userEnableEvent);
    34.         }
    35.         {
    36.             String tag = "UserDisableEvent";
    37.             UserEvents.UserDisableEvent userDisableEvent = new UserEvents.UserDisableEvent();
    38.             userDisableEvent.setUserId(userId);
    39.             userDisableEvent.setUserName("Name-" + userId);
    40.             sendOrderlyMessage(topic, tag, userDisableEvent);
    41.         }
    42.         {
    43.             String tag = "UserDeletedEvent";
    44.             UserEvents.UserDeletedEvent userDeletedEvent = new UserEvents.UserDeletedEvent();
    45.             userDeletedEvent.setUserId(userId);
    46.             userDeletedEvent.setUserName("Name-" + userId);
    47.             sendOrderlyMessage(topic, tag, userDeletedEvent);
    48.         }
    49.     }
    50.     private void sendOrderlyMessage(String topic, String tag, UserEvents.UserEvent event) {
    51.         String shardingKey = String.valueOf(event.getUserId());
    52.         String json = JSON.toJSONString(event);
    53.         Message<String> msg = MessageBuilder
    54.                 .withPayload(json)
    55.                 .build();
    56.         String destination = createDestination(topic, tag);
    57.         SendResult sendResult = this.rocketMQTemplate.syncSendOrderly(destination, msg, shardingKey, 2000);
    58.         log.info("Send result is {} for msg", sendResult, msg);
    59.     }
    60.     protected String createDestination(String topic, String tag) {
    61.         if (org.apache.commons.lang3.StringUtils.isNotEmpty(tag)){
    62.             return topic + ":" + tag;
    63.         }else {
    64.             return topic;
    65.         }
    66.     }
    67.     @AfterEach
    68.     void tearDown() {
    69.     }
    70.     @Test
    71.     void getUserEvents() {
    72.         this.userIds.forEach(userId ->{
    73.             List<UserEvents.UserEvent> userEvents = this.userMessageConsumer.getUserEvents(userId);
    74.             Assertions.assertEquals(4, userEvents.size());
    75.             Assertions.assertTrue(userEvents.get(0) instanceof UserEvents.UserCreatedEvent);
    76.             Assertions.assertTrue(userEvents.get(1) instanceof UserEvents.UserEnableEvent);
    77.             Assertions.assertTrue(userEvents.get(2) instanceof UserEvents.UserDisableEvent);
    78.             Assertions.assertTrue(userEvents.get(3) instanceof UserEvents.UserDeletedEvent);
    79.         });
    80.     }
    81. }

    启动时,可以看到如下日志:

    TagBasedDispatcherConsumerContainer : success to subscribe  http://127.0.0.1:9876, topic consumer-test-topic, tag UserCreatedEvent||UserEnableEvent||UserDeletedEvent||UserDisableEvent, group user-message-consumer
    

    从日志上可以看出,框架以组 group user-message-consumer 创建 Consumer,并订阅 consumer-test-topic 的 UserCreatedEvent||UserEnableEvent||UserDeletedEvent||UserDisableEvent 等 Tag,初始化流程符合预期。

    测试逻辑比较简单,逻辑如下:

    1. 创建 100 个用户
    2. 每个用户创建并依次发布领域事件,UserCreatedEvent、UserEnableEvent、UserDisableEvent、UserDeletedEvent
    3. 消费发送完成后,停顿 3 秒
    4. 依次检测每个用户收到的消息,并对顺序进行检测

    观察日志,可以看到发送和消费日志交替出现:

    1. UserMessageConsumerTest        : Send result is SendResult [sendStatus=SEND_OK, msgId=2408820718EADE005827F0B9E9D4D6D9B98158644D467D38DE4900FD, offsetMsgId=C0A8010A00002A9F00000000056077FB, messageQueue=MessageQueue [topic=consumer-test-topic, brokerName=bogon, queueId=2], queueOffset=1121for msg
    2. TagBasedDispatcherConsumerContainer : consume 2408820718EADE005827F0B9E9D4D6D9B98158644D467D38DE4700FC cost: 0 ms

    用例通过,运行结果符合预期。

    3. 设计&扩展

    3.1. 初始化流程

    image

     

    框架初始化流程如下:

    1. TagBasedDispatcherConsumerContainerRegistry 实现 Spring 的 BeanPostProcessor 接口,依次对托管 bean 进行处理;
    2. 如果 Bean 上存在 @TagBasedDispatcherMessageConsumer 注解,便会提取配置信息,构建 TagBasedDispatcherConsumerContainer 实例
    3. TagBasedDispatcherConsumerContainer 收集方法上的 @HandleTag 注解,结合 @TagBasedDispatcherMessageConsumer 上的 topic、consumer 等信息构建 DefaultMQPushConsumer 并完成 topic 和 tag 的订阅
    4. TagBasedDispatcherConsumerContainer 内部会构建 tag 与 method 的映射关系,以对指定tag进行处理;

    3.2. 运行流程

     

    image
    运行流程如下:

    1. 消息发送者将消息发送至 MQ;
    2. MQ 将消息发送至 Consumer;
    3. Consumer 收到消息后,根据 tag 对消息进行分发;
    4. 处理器对消息进行反序列化,获取调用参数,然后调用方法执行业务逻辑;
  • 相关阅读:
    电源芯片的选择简略
    python项目之requirements.txt文件
    欺诈检测中的不平衡分类
    JVM基础_10_各种垃圾回收器
    P03 MySQL 数据类型详解
    c++新闻发布系统(支持登录注册)
    安全开发实战(4)--whois与子域名爆破
    Kotlin 协程
    JAVA架构之路(MySql事务隔离级别)
    【论文总结】Composition Kills: A Case Study of Email Sender Authentication
  • 原文地址:https://blog.csdn.net/m0_74931226/article/details/127916252