场景一:用户注册后,异步发送积分。如何保证用户注册后,积分最终一定能发送成功呢?

场景二:支付扣款业务,如何保证当前业务与账务系统的最终一致性?如何保证第三方支付成功与本地业务DB的一致性?

消息生产方,需要额外建一个消息表,并记录消息发送状态。消息表和业务数据要在一个事务里提交。然后消息异步发送到消息的消费方,消费方返回是否处理成功,如果成功,则更新本地状态。如果消息发送失败或者消费方处理失败,会进行重试(通过Job),重试达到最大次数,需要人工介入。
消息消费方,需要处理这个消息,并完成自己的业务逻辑,并且支持幂等操作。
如果是RPC方法发送消息,可以让服务端定期回调客户端(比如Dubbo的参数回调),更新服务端的状态,可以减少交互
架构图

本地消息事务表-MQ方案

实现方式
自定义注解+AOP方式+SPEL表达式(注意此AOP与事务AOP的顺序,一定要在事务内)
@EnableMessage(key = "#order.uniqueId")
public Order saveOrder(Order order)
闭包方式(不推荐,代码耦合严重,不推荐)
public Order saveOrder(Order order,TransactionCallBack<T> action)
本地消息事务表缺点:
Producer向MQ发送Prepare状态的消息(需要注册业务状态检查处理器)
Producer执行本地业务逻辑(执行本地事务)
架构图

原理分析
适用场景:对于有唯一流水的情况比较合适(流水只插入不更新的情况),因为当MQ回查Producer事务是否提交的时候,如果流水被更新可能导致MQ误以为事务没有执行成功,导致消息被作废掉
优点: 相对于本地事务表,业务方只需要提供状态回查接口即可,不需要做其他额外操作(不需要独立启动一个定时任务)。
缺点:
场景:对于不支持2PC事务机制的MQ。
优点:
缺点:
对于不支持2PC事务机制的MQ,可以开发一套【消息服务中间件】来解决这个问题,这个中间件包含几大模块
架构图

服务功能模块
事务消息状态
事务消息数据库设计
CREATE TABLE `transaction_message` (
`id` varchar(50) COLLATE utf8mb4_bin NOT NULL COMMENT '主键',
`business_producer` varchar(50) COLLATE utf8mb4_bin NOT NULL COMMENT '业务发起方(业务方描述)',
`business_unique_id` varchar(100) COLLATE utf8mb4_bin NOT NULL COMMENT '业务数据唯一标识',
`message_id` varchar(100) COLLATE utf8mb4_bin NOT NULL COMMENT '消息id,唯一标识',
`message_body` mediumtext COLLATE utf8mb4_bin NOT NULL COMMENT '消息内容',
`message_data_type` tinyint(4) NOT NULL DEFAULT '1' COMMENT '消息类型,1 json',
`message_send_times` int(11) NOT NULL DEFAULT '0' COMMENT '消息发送次数',
`producer_callback_times` int(11) NOT NULL DEFAULT '0' COMMENT '回调发起方次数',
`consumer_callback_times` int(11) NOT NULL DEFAULT '0' COMMENT '回调消费方次数',
`message_queue_type` varchar(100) COLLATE utf8mb4_bin NOT NULL COMMENT '消费队列或者topic',
`status` tinyint(4) NOT NULL COMMENT '状态,1等待确认 ,2已经确认 , 3废弃,4已经发送但未确认,5已经发送并且已经确认,6MQ发送达到最大次数,7producer消息回查最大重试次数,8consumer消息回查最大重试次数',
`mark` varchar(100) COLLATE utf8mb4_bin DEFAULT NULL COMMENT '备注,额外消息',
`business_request_time` datetime NOT NULL COMMENT '业务请求时的时间',
`business_receive_time` datetime NOT NULL COMMENT '接收到业务请求时的时间',
`version` int(11) NOT NULL DEFAULT '1' COMMENT '乐观锁版本号',
`create_time` datetime NOT NULL COMMENT '创建时间',
`last_update_time` datetime NOT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '最后一次修改时间',
PRIMARY KEY (`id`),
UNIQUE KEY `unq_business` (`business_unique_id`,`business_producer`) USING BTREE,
KEY `idx_create_time` (`create_time`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT='事务消息表';
如果后期对接的业务方非常多,数据量非常大,可以考虑
逻辑视图

技术栈

比如用户注册,发送积分。由于积分服务是采购的,无法整合MQ,此时可以直接异步调用积分服务的接口进行处理!
解决以上问题,又会存在如下问题
直连架构图,消息中间件可以支持多种MQ扩展,也可以支持内存级别的事件发布

常见的方案
要面临的问题
实现原理
实现方式