• 【项目】数据库事务与MQ发送一致性


    数据库事务与MQ发送一致性

    技术背景

    在进行服务改造的时候,会需要进行服务技术架构的选型。其中比较关键的就是通信中间件的选择。
    不同服务之间,同步通信比较典型的代表是 RPC ,异步通信的典型代表是 MQ 。

    问题现状

    在电商业务中,如果需要使用 MQ 作为消息通信中间件,那么需要解决 MQ 的一致性问题。MQ 提供一致性保障又分为两个方面。

    发消息时确保业务操作和发消息是一致的;接收消息的时候要确保消息最终被正常处理了。

    发送方一般通过事务保证,消费方通常使用消费 ACK 和重试来达到一致性。

    数据库事务解决 MQ 发送一致性

    以 MySQL 为例,在同一实例中的不同 db ,如果共享同一个 Connection 的话,是可以在同一个事务中的。

    同一实例,指的是使用同一台机器;同一个 Connection ,指的是同一个服务使用这台机器。

    那么这种情况下,执行下面这种操作,是可以支持在同一事务中进行操作的。

    begin transaction;
    insert into A.tbl1(name, age) values('admin', 18);
    insert into B.tbl2(num) values(20);
    end transaction;
    
    • 1
    • 2
    • 3
    • 4

    依托于这样的实现,我们可以在所有的 MySQL 实例中,均创建出一个专门负责 MQ 的 db 。且这个 db 对于应用透明,那么我们可以在使用的时候,将发送消息与业务操作放在同一个事务中即可。
    在电商业务场景中,在支付的时候要插入支付流水,同时还需要发送一条消息通知其他业务系统。那么在这种场景下,我们需要保证两个处理同时完成。

    @Transactional
    public void pay(order order) {
        PayTransaction t = buildPayTransaction(order);
        payDao.append(t);
        producer.sendMessage(buildMessage(t));
        final Message message = buildMessage(t);
        messageDao.insert(message);
        // 在事务提交后执行
        triggerAfterTransactionCommit(()-> {
            messageclient.send(message);
            messageDao.delete(message);
        });
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    实际上在 producer.sendMessage 执行的时候,消息并没有通过网络发送出去,而仅仅是往业务 DB 同一个实例上的消息库插入一条记录,然后注册事务的回调。在这个事务真正提交后消息才从网络发送出去,这个时候如果发送到 consumer 成功的话消息会被立即删除掉。而如果消息发送失败则消息就留在消息库里,这个时候我们会有一个补偿任务会将这些消息从指定的 message db 消息库里捞出然后重新发送,直到发送成功。
    整个流程图如图所示:

    这样的结构下,每次发送消息,第一个可以利用 db 中已经持久化的数据进行,第二个可以使用定时任务做数据补偿。

    简单模式下的事务 MQ 一致性

    在一个事务中,同时保证事务和 MQ 的发送一致性,可以使用事务监听的方式实现。
    在 Spring 中的声明式事务方式中,可以使用 TransactionSynchronizationManager 事务同步管理器,对事务进行后置增强,指定发送 MQ 的操作在事务提交之后完成。但是对于 MQ 的发送成功,需要做一些其他的补偿机制。 MQ 发送到 Exchange 的过程中,如果发生问题,可能会导致发送失败的情况。
    这种情况下,比较好的能提高发送成功的概率的方法,是可以使用@Retryable注解的。多重试几次,直到确认为止。

    @Transactional
    public void finishOrder(Order order){
        // 添加订单成功
        insertOrderSuccess(order);
        
        // 发送消息到 MQ
        TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter(){
            @Override
            public void afterCommit() {
                mqService.send(order);
            }
        });
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    这种直接写样板模式的代码,耦合度比较高,而且会造成大量重复。这种情况下,需要判断当前是否存在事务,否则会报错:

    java.lang.IllegalStateException: Transaction synchronization is not active

    正确方式是:

    private final ExecutorService executor = Executors.newSingleThreadExecutor();
    
    public void sendLog() {
        // 判断当前是否存在事务
        if (!TransactionSynchronizationManager.isSynchronizationActive()) {
            // 无事务,异步发送消息给kafka
            
            executor.submit(() -> {
                // 发送消息给kafka
                try {
                    // 发送消息给kafka
                } catch (Exception e) {
                    // 记录异常信息,发邮件或者进入待处理列表,让开发人员感知异常
                }
            });
            return;
        }
        
        // 有事务,则添加一个事务同步器,并重写afterCompletion方法(此方法在事务提交后会做回调)
        TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
            
            @Override
            public void afterCompletion(int status) {
                if (status == TransactionSynchronization.STATUS_COMMITTED) {
                    // 事务提交后,再异步发送消息给kafka
                    executor.submit(() -> {
                        try {
                            // 发送消息给kafka
                        } catch (Exception e) {
                            // 记录异常信息,发邮件或者进入待处理列表,让开发人员感知异常
                        }
                    });
                }
            }
            
        });
        
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    为了方便使用,可以进行事务监听,遵循这样的业务流程:

    注册事件 -> 事件监听 -> 事务提交 -> 事件执行

    解耦之后的代码:

    @Service
    @Slf4j
    public class UserServiceImpl extends implements UserService {
    
    	@Autowired
        UserMapper userMapper;
        	
    	@Autowired
        ApplicationEventPublisher eventPublisher;
    	
    	public void userRegister(User user){
    		userMapper.insertUser(user);
    		eventPublisher.publishEvent(new UserRegisterEvent(new Date()));
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    自定义事件:

    public class UserRegisterEvent extends ApplicationEvent {
    
        private Date registerDate;
    
        public UserRegisterEvent(Date registerDate) {
            super(registerDate);
            this.registerDate = registerDate;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    事件监听器:

    @Slf4j
    @Component
    public class UserListener {
    
        @Autowired
        UserService userService;
    
        @Async
        @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT, classes = UserRegisterEvent.class)
        public void onUserRegisterEvent(UserRegisterEvent event) {
            userService.sendActivationCode(event.getRegisterDate());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    消息驱动事务补偿

    事件背景

    在 DDD 架构中,要实现领域事件驱动优化,需要将不同的领域消息进行抽象,使用 MQ 消息来进行事件驱动,完成事件的最终一致性。

    领域事件

    在实践之前,需要了解一下领域事件的概念。
    领域事件(Domain Events)是领域驱动设计(Domain Driven Design,DDD)中的一个概念,用于捕获我们所建模的领域中所发生过的事情。
    它用来表示领域中发生的事件。一个领域事件将导致进一步的业务操作,在实现业务解耦的同时,还有助于形成完整的业务闭环。
    举例来说的话,领域事件可以是业务流程的一个步骤,比如投保业务缴费完成后,触发投保单转保单的动作;也可能是定时批处理过程中发生的事件,比如批处理生成季缴保费通知单,触发发送缴费邮件通知操作;或者一个事件发生后触发的后续动作,比如密码连续输错三次,触发锁定账户的动作。
    如何识别领域事件:在做用户旅程或者场景分析时,我们要捕捉业务、需求人员或领域专家口中的关键词:“如果发生……,则……”“当做完……的时候,请通知……”“发生……时,则……”等。在这些场景中,如果发生某种事件后,会触发进一步的操作,那么这个事件很可能就是领域事件。

    实践过程

    在实践中,需要做的是:

    事件发布 -> 消息发布 -> 消息传递 -> 消息消费 -> 事件处理

    主要分为五个步骤来划分这一过程。现在需要着重处理的是上游业务,也就是事件发布和消息发布。
    在事件发布中,我们通常定义一个通用的领域事件,通常需要包含事件 ID 、时间、主题、数据等。

    public class DomainEvent<T> {
        private String eventId;
        private String eventTopic;
        private Date eventTime;
        private T eventData;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    之后通常会需要创建一个事件发布器:

    /**
    * 事件预发布,在有事务的时候需要先保存事件,之后再通过MQ进行发布
    */
    public <T> void prePublish(DomainEvent<T> domainEvent) {
        domainEventService.save(domainEvent);
        // 将消息添加到ThreadLocal中
        addDomainEvent(domainEventEntity);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    消息的发布:

    public void publish() {
        // 移除ThreadLocal中的消息并进行发送
        List<DomainEvent> list = removeAndGet();
        for (DomainEvent event : list) {
            try {
                // 发送消息
                MqService.send(event);
            } catch (Exception e) {
                // 抛出异常
                throw new Exception();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    整体的执行流程:

    事务执行 -> 生成事件 -> 事件预发送 -> 事件发送

    整体的流程没问题,但是需要做的是事件补偿。事务补偿,可以利用定时任务,扫描对应的数据库中的持久化事件数据是否存在。如果存在,就取出来做发送;反之则不操作。
    添加到 ThreadLocal 中,是为了减少数据库的 I/O 操作,直接从 ThreadLocal 中查询数据会更快速,也更方便做事件补偿。

  • 相关阅读:
    【pandas小技巧】--日期相关处理
    【Robotframework+python】实现http接口自动化测试
    Anchor-free目标检测综述 -- Dense Prediction篇
    rabbitmq发送消息通用接口
    在c#中如何将多个点位(Point)转换为多边形(Polygon)并装换为shp图层
    【c++_containers】10分钟带你学会list
    力扣题目训练(17)
    Head First设计模式(阅读笔记)-04.工厂模式
    3.5 Android gpu_mem ebpf程序设计原理(一)
    【Windows系统5分钟搭建Linux环境】
  • 原文地址:https://blog.csdn.net/qq_43103529/article/details/126669449