• 统一消息分发中心设计


    背景

    我们核心业务中订单完成时,需要完成后续的连带业务,扣件库存库存、增加积分、通知商家等。

    如下图的架构:

    这样设计出来导致我们的核心业务和其他业务耦合,每次新增连带业务或者去掉连带业务都需要修改核心业务。

    一方面,不合符软件设计的OCP原则;

    二方面,修改核心业务风险、成本也是很大的。

    方案

    基于上述方案的问题,我们设计了新的方案。新的方案,可以动态接入新的连带业务,不会入侵核心业务,降低了变动的风险和成本。

    1. 订单下发统一的ORDER_CREATED事件消息;

     消息格式如下:

    {
        "enterEvent":"ORDER_CREATED",
        "data":{
            "order_id":"OR2023111000000001"
        },
        "source":"ORDER",
        "datetime":"2023-11-10 21:40:52"
    }

    2. 所有核心业务发送到MQ的消息,统一发送到分发中心DISPATCHER_CENTER。

    消息统一中心获取配置的路由信息,将消息发送到MQ。

    1. **
    2. * @author darmi
    3. */
    4. @Component
    5. public class KafkaEventListener {
    6. @Autowired
    7. private MsgDispatcherCenterRepository msgDispatcherCenterRepository;
    8. @Autowired
    9. private KafkaTemplate kafkaTemplate;
    10. @Autowired
    11. @Qualifier(value = "eventExecutor")
    12. private Executor eventExecutor;
    13. @KafkaListener(topics = {"DISPATCHER_CENTER"})
    14. public void dispatchMsg(String event) {
    15. DispatcherCenterEvent dispatcherCenterEvent = DispatcherCenterEvent.getObject(event);
    16. eventExecutor.execute(() ->{
    17. msgDispatcherCenterRepository.findMsgDispatcherCenterByCenterEventAndActive(
    18. dispatcherCenterEvent.getCenterEvent(), Boolean.TRUE)
    19. .forEach(e -> kafkaTemplate.send(e.getRouteEvent(), dispatcherCenterEvent.getData()));
    20. });
    21. }
    22. }

     

    Mysql的消息路由表设计如下:

    1. CREATE TABLE `tb_msg_dispatcher_ center` (
    2. `id` int NOT NULL AUTO_INCREMENT,
    3. `center_event` varchar(255) NOT NULL,
    4. `route_event` varchar(255) NOT NULL,
    5. `active` tinyint NOT NULL DEFAULT '0',
    6. `created` timestamp NOT NULL,
    7. `updated` timestamp NOT NULL,
    8. PRIMARY KEY (`id`),
    9. KEY `idx_center_event_active` (`center_event`,`active`)
    10. ) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

     

     3. 各个业务监听消息,处理自己的业务。

    1. @KafkaListener(topics = {"ADD_POINT"})
    2. public void addPoint(String event) {
    3. // 添加积分
    4. }
    5. @KafkaListener(topics = {"REDUCE_STOCK"})
    6. public void reduceStock(String event) {
    7. // 扣件库存
    8. }
    9. @KafkaListener(topics = {"NOTICE_MERCHANT"})
    10. public void noticeMerchant(String event) {
    11. // 通知商家
    12. }

    总结

    这个方案是一个简易可行的方案,符合快速上手并实施。在人力、时间、成本不充足的情况下,基本能满足我们的需求。

    如果想让它作为平台级的技术组件推广,还有一些细节的点可以优化。

    • 核心的业务也可以分离出来,通过平台配置的方式自动分发数据到消息中心。
    • 消息中心每次都会从数据库拉去路由表信息,性能不好,可以放在分布式缓存或本地内存。这时需要注意缓存数据的一致性问题。
    • 分发中心是否存在性能瓶颈、集群化等。
    • 连带业务是否也可以通过配置,自动拉取MQ的消息。

  • 相关阅读:
    模型保存和加载
    Yolov8小目标检测(23):多分支卷积模块RFB,扩大感受野提升小目标检测精度
    部署Zookeeper集群和Kafka集群
    Python --- 在python中安装NumPy,SciPy,Matplotlib以及scikit-learn(Windows平台)
    Redis Sentinel集群管理手册
    kali安装docker(亲测有效)
    LNMP平台搭建
    AI只需26秒,就可以设计一款会走路的机器人
    ①css实现九宫格动画——前端筑基&&每天一道前端案例实现
    大数据学习(8)-hive压缩
  • 原文地址:https://blog.csdn.net/qq_28175019/article/details/134170717