说到事务大家都不陌生,必须满足4个条件(ACID):原子性(Atomicity,或称不可分割性)、一致性(Consistency)、隔离性(Isolation,又称独立性)、持久性(Durability)。RocketMQ的事务消息和数据库中的数据没有太多关系,但是本质上是基于2PC理论实现的最终一致性。
RocketMQ提供了一个事务监听器接口TransactionListener,里面有两个方法:executeLocalTransaction中执行本地事务方法,执行完返回COMMIT_MESSAGE、ROLLBACK_MESSAGE、UNKNOW,分别对应事务消息执行成功、回滚、未知,如果是未知状态则会触发Broker调用checkLocalTransaction进行状态回查。
1、 Producer在发送消息的时候会指定该消息为半消息,Broker接受到半消息是不能被Consumer消费的。
2、 Broker向Producer发送半消息发送成功标志。
3、 Producer开始执行本地事务方法。
4、 Producer执行完本地事务方法开始向Broker发送提交或回滚事务消息命令。
5、 如果Producer发送的事务消息状态是未知,则会定时向Producer进行回查。
6、 通过回查确定事务消息执行状态再觉得是否将消息暴露给消费者。
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendMessageInTransaction 代码有删减
public TransactionSendResult sendMessageInTransaction(final Message msg,
final LocalTransactionExecuter localTransactionExecuter, final Object arg)
throws MQClientException {
// ①
TransactionListener transactionListener = getCheckListener();
......
SendResult sendResult = null;
// ②
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
// ③
sendResult = this.send(msg);
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable localException = null;
switch (sendResult.getSendStatus()) {
case SEND_OK: {
try {
String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (null != transactionId && !"".equals(transactionId)) {
msg.setTransactionId(transactionId);
}
else if (transactionListener != null) {
log.debug("Used new transaction API");
// ④
localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
}
// ⑤
this.endTransaction(sendResult, localTransactionState, localException);
return transactionSendResult;
}
①、从Producer拿到TransactionListener 监听器。
②、给事务消息打上标识,在发送到Broker时会做特殊处理。
③、发送消息。
④、执行本地事务方法。
⑤、结束事务方法,告诉Broker是进行提交或者回滚。
前面说过在Broker接受到事务消息的时候,会调用prepareMessage进行存储消息,中间会对消息做特殊处理。
org.apache.rocketmq.broker.processor.SendMessageProcessor#sendMessage
TransactionalMessageBridge就是处理普通消息和事务消息的桥梁。首先会备份原始消息的topic和queueId,以备消息还原,然后再将消息的topic设置为半消息内置topic中,queueId固定为0,此时的topic对消费者是不可见的。
事务消息发送完成,就等待Producer提交事务消息执行状态,在Broker会分发给EndTransactionProcessor处理。如果是提交状态,则取出半消息,还原原始消息的属性,并重新投递到队列中去;如果消息是回滚,则从半消息队列中移除,投入到RMQ_SYS_TRANS_OP_HALF_TOPIC主题下。
那么如果Producer反馈的消息状态是未知的话,Broker也是提供了回查机制的,并且默认回查的次数数15次,处理线程是TransactionalMessageCheckService。
到这里事务消息的介绍就结束了,你有没有一点感触呢,是否能推断出RocketMQ是如何处理延迟消息的么?