• RocketMQ事务消息 超时重发还是原来的消息吗?


    以下面的一个demo例子来分析一下,探索RocketMQ事务消息原理。

        public static final String PRODUCER_GROUP = "tran-test";
        public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9876";
        public static final String TOPIC = "Test";
    
        public static void main(String[] args) throws Exception {
            TransactionListener transactionListener = new TransactionListener() {
                @Override
                public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                    System.out.println(String.format("executeLocalTransaction: %s", msg.getTransactionId()));
                    return LocalTransactionState.UNKNOW;
                }
    
                @Override
                public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                    System.out.println(String.format("checkLocalTransaction: tranId=%s, commitLogOffset=%s, queueOffset=%s, msgId=%s",
                            msg.getTransactionId(), msg.getCommitLogOffset(),
                            msg.getQueueOffset(), msg.getMsgId()));
                    return LocalTransactionState.UNKNOW;
                }
            };
            TransactionMQProducer producer = new TransactionMQProducer(PRODUCER_GROUP);
            producer.setTransactionListener(transactionListener);
            producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
            producer.start();
            Message msg = new Message(TOPIC, "test".getBytes());
            SendResult sendResult = producer.sendMessageInTransaction(msg, null);
            System.out.println(String.format("sendResult: tranId=%s, offsetMsgId=%s, queueOffset=%s msgId=%s",
                    sendResult.getTransactionId(), sendResult.getOffsetMsgId(),
                    sendResult.getQueueOffset(), sendResult.getMsgId()));
            CountDownLatch countDownLatch = new CountDownLatch(1);
            countDownLatch.await();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    executeLocalTransaction: C0DE00428BEC18B4AAC27F377B6E0000
    sendResult: tranId=C0DE00428BEC18B4AAC27F377B6E0000, offsetMsgId=null, queueOffset=82 msgId=C0DE00428BEC18B4AAC27F377B6E0000
    checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1315411, queueOffset=83, msgId=C0DE004200002A9F0000000000141253
    checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1315805, queueOffset=84, msgId=C0DE004200002A9F00000000001413DD
    checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1316199, queueOffset=85, msgId=C0DE004200002A9F0000000000141567
    checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1316593, queueOffset=86, msgId=C0DE004200002A9F00000000001416F1
    checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1316987, queueOffset=87, msgId=C0DE004200002A9F000000000014187B
    checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1317381, queueOffset=88, msgId=C0DE004200002A9F0000000000141A05
    checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1317775, queueOffset=89, msgId=C0DE004200002A9F0000000000141B8F
    checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1318169, queueOffset=90, msgId=C0DE004200002A9F0000000000141D19
    checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1318563, queueOffset=91, msgId=C0DE004200002A9F0000000000141EA3
    checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1318957, queueOffset=92, msgId=C0DE004200002A9F000000000014202D
    checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1319352, queueOffset=93, msgId=C0DE004200002A9F00000000001421B8
    checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1319747, queueOffset=94, msgId=C0DE004200002A9F0000000000142343
    checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1320142, queueOffset=95, msgId=C0DE004200002A9F00000000001424CE
    checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1320537, queueOffset=96, msgId=C0DE004200002A9F0000000000142659
    checkLocalTransaction: tranId=C0DE00428BEC18B4AAC27F377B6E0000, commitLogOffset=1320932, queueOffset=97, msgId=C0DE004200002A9F00000000001427E4
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    通过上述例子的输出结果可以发现,checkLocalTransaction中queueOffset、msgId都发生的变化。那么在broker中到底发生了什么呢。

    事务消息原理

    客户端发送一个事务消息时,MessageConst.PROPERTY_TRANSACTION_PREPARED=“true” 标记这个消息是一个事务消息。

            SendResult sendResult = null;
            MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
            MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
            try {
                sendResult = this.send(msg);
            } catch (Exception e) {
                throw new MQClientException("send message Exception", e);
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    broker在收到消息时会取出traFlag,如果traFlag=true消息将交给TransactionalMessageService处理

            String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
            boolean sendTransactionPrepareMessage = false;
            if (Boolean.parseBoolean(traFlag)
                && !(msgInner.getReconsumeTimes() > 0 && msgInner.getDelayTimeLevel() > 0)) { //For client under version 4.6.1
                if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
                    response.setCode(ResponseCode.NO_PERMISSION);
                    response.setRemark(
                        "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
                            + "] sending transaction message is forbidden");
                    return response;
                }
                sendTransactionPrepareMessage = true;
            }
    
            long beginTimeMillis = this.brokerController.getMessageStore().now();
    
            if (brokerController.getBrokerConfig().isAsyncSendEnable()) {
                CompletableFuture<PutMessageResult> asyncPutMessageFuture;
                if (sendTransactionPrepareMessage) {
                    //处理事务消息
                    asyncPutMessageFuture = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
                } else {
                    asyncPutMessageFuture = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
                }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    TransactionalMessageService在保存消息时会将原来的topic使用RMQ_SYS_TRANS_HALF_TOPIC来替换,原topic信息存放在properties中。这样在是先把消息保存下来,而不让Consumer立刻就能收到。
    当收到TransactionMQProducer发来的COMMIT_MESSAGE时,再将消息从RMQ_SYS_TRANS_HALF_TOPIC取出替换成原来的topic写入。同时再向RMQ_SYS_TRANS_OP_HALF_TOPIC的topic中也写一份。
    broker通过对比RMQ_SYS_TRANS_OP_HALF_TOPIC和RMQ_SYS_TRANS_HALF_TOPIC中是否同时存在来判断事务消息是否结束了。
    当收到的不是COMMIT_MESSAGE而是UNKNOW时,TransactionalMessageCheckService会定时回调TransactionMQProducer#checkLocalTransaction查询本地事务状态,默认最多检查15次。

    在这里插入图片描述

    TransactionalMessageCheckService

    TransactionalMessageCheckService是一个运行在broker中的一个线程,线程默认每1分钟执行一次来检测系统中超时的half事务消息并发起重试。

        @Override
        public void check(long transactionTimeout, int transactionCheckMax,
            AbstractTransactionalMessageCheckListener listener) {
            try {
                String topic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC;
                Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);
                if (msgQueues == null || msgQueues.size() == 0) {
                    log.warn("The queue of topic is empty :" + topic);
                    return;
                }
                log.debug("Check topic={}, queues={}", topic, msgQueues);
                for (MessageQueue messageQueue : msgQueues) {
                    long startTime = System.currentTimeMillis();
                    //每个half queue都有一个对应的op queue
                    MessageQueue opQueue = getOpQueue(messageQueue);
                    //获取当前未完成的half queue的offset
                    long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue);
                    //获取当前已完成的op queue的offset
                    long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue);
                    log.info("Before check, the queue={} msgOffset={} opOffset={}", messageQueue, halfOffset, opOffset);
                    if (halfOffset < 0 || opOffset < 0) {
                        log.error("MessageQueue: {} illegal offset read: {}, op offset: {},skip this queue", messageQueue,
                            halfOffset, opOffset);
                        continue;
                    }
                    ......
                    // single thread
                    int getMessageNullCount = 1;
                    long newOffset = halfOffset;
                    long i = halfOffset;
                    long nextOpOffset = pullResult.getNextBeginOffset();
                    int putInQueueCount = 0;
                    int escapeFailCnt = 0;
    
                    while (true) {
                        if (System.currentTimeMillis() - startTime > MAX_PROCESS_TIME_LIMIT) {
                            log.info("Queue={} process time reach max={}", messageQueue, MAX_PROCESS_TIME_LIMIT);
                            break;
                        }
                        if (removeMap.containsKey(i)) {
                            ......
                        } else {
                            //从RMQ_SYS_TRANS_HALF_TOPIC取出half消息
                            GetResult getResult = getHalfMsg(messageQueue, i);
                            MessageExt msgExt = getResult.getMsg();
                            if (msgExt == null) {
                                if (getMessageNullCount++ > MAX_RETRY_COUNT_WHEN_HALF_NULL) {
                                    break;
                                }
                        	......
                            ......
                            //是否需要丢弃消息
                            if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) {
                                listener.resolveDiscardMsg(msgExt);
                                newOffset = i + 1;
                                i++;
                                continue;
                            }
                            ......
                            //判断上次check是否超时
                            boolean isNeedCheck = opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime
                                || opMsg != null && opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout
                                || valueOfCurrentMinusBorn <= -1;
    
                            if (isNeedCheck) {
                                //超时
                                if (!putBackHalfMsgQueue(msgExt, i)) {
                                    continue;
                                }
                                putInQueueCount++;
                                log.info("Check transaction. real_topic={},uniqKey={},offset={},commitLogOffset={}",
                                        msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC),
                                        msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX),
                                        msgExt.getQueueOffset(), msgExt.getCommitLogOffset());
                                //重新给TransactionListener发起check请求
                                listener.resolveHalfMsg(msgExt);
                    ......
                    ......
                    ......
                    if (newOffset != halfOffset) {
                        transactionalMessageBridge.updateConsumeOffset(messageQueue, newOffset);
                    }
                    long newOpOffset = calculateOpOffset(doneOpOffset, opOffset);
                    if (newOpOffset != opOffset) {
                        transactionalMessageBridge.updateConsumeOffset(opQueue, newOpOffset);
                    }                            
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86

    上述代码中有三个比较重要的细节,needDiscard、putBackHalfMsgQueue和listener.resolveHalfMsg。
    needDiscard:从half queue取出来后判断消息的TRANSACTION_CHECK_TIMES属性是否大于15次。
    小于15次,则TRANSACTION_CHECK_TIMES属性值+1。
    大于15次,则从RMQ_SYS_TRANS_HALF_TOPIC中丢弃,通过listener.resolveDiscardMsg保存在TRANS_CHECK_MAX_TIME_TOPIC中交由人工处理。
    putBackHalfMsgQueue:将消息重新插入一份到RMQ_SYS_TRANS_HALF_TOPIC,因为CommitLog的applyOnly特性不能修改原消息。所以需要重新apply消息导致queueOffset、commitLogOffset、msgId都发生了变化。

        private boolean putBackHalfMsgQueue(MessageExt msgExt, long offset) {
            PutMessageResult putMessageResult = putBackToHalfQueueReturnResult(msgExt);
            if (putMessageResult != null
                && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
                msgExt.setQueueOffset(
                    putMessageResult.getAppendMessageResult().getLogicsOffset());
                msgExt.setCommitLogOffset(
                    putMessageResult.getAppendMessageResult().getWroteOffset());
                msgExt.setMsgId(putMessageResult.getAppendMessageResult().getMsgId());
                log.debug(
                    "Send check message, the offset={} restored in queueOffset={} "
                        + "commitLogOffset={} "
                        + "newMsgId={} realMsgId={} topic={}",
                    offset, msgExt.getQueueOffset(), msgExt.getCommitLogOffset(), msgExt.getMsgId(),
                    msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX),
                    msgExt.getTopic());
                return true;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    listener.resolveHalfMsg:通过回调resolveHalfMsg方法向TransactionMQProducer重发check。

        public void resolveHalfMsg(final MessageExt msgExt) {
            if (executorService != null) {
                executorService.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            sendCheckMessage(msgExt);
                        } catch (Exception e) {
                            LOGGER.error("Send check message error!", e);
                        }
                    }
                });
            } else {
                LOGGER.error("TransactionalMessageCheckListener not init");
            }
        }
    
        public void sendCheckMessage(MessageExt msgExt) throws Exception {
            CheckTransactionStateRequestHeader checkTransactionStateRequestHeader = new CheckTransactionStateRequestHeader();
            checkTransactionStateRequestHeader.setCommitLogOffset(msgExt.getCommitLogOffset());
            checkTransactionStateRequestHeader.setOffsetMsgId(msgExt.getMsgId());
            checkTransactionStateRequestHeader.setMsgId(msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
            checkTransactionStateRequestHeader.setTransactionId(checkTransactionStateRequestHeader.getMsgId());
            checkTransactionStateRequestHeader.setTranStateTableOffset(msgExt.getQueueOffset());
            checkTransactionStateRequestHeader.setBname(brokerController.getBrokerConfig().getBrokerName());
            msgExt.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));
            msgExt.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));
            msgExt.setStoreSize(0);
            String groupId = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
            Channel channel = brokerController.getProducerManager().getAvailableChannel(groupId);
            if (channel != null) {
                //取出与broker相连的netty channel发送check消息
                brokerController.getBroker2Client().checkProducerTransactionState(groupId, channel, checkTransactionStateRequestHeader, msgExt);
            } else {
                LOGGER.warn("Check transaction failed, channel is null. groupId={}", groupId);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    half消息示意图

  • 相关阅读:
    Java中static关键字的使用与练习
    vue3 + Element-plus + Echarts 5.2 切换不更新、导出PDF不显示 解决方案
    使用wireshark解析ipsec esp包
    webrtc-m79-msvc编译H264
    华为云云耀云服务器L实例评测|利用云服务器部署个人博客站
    stm32的时钟、中断的配置(针对寄存器),一些基础知识
    宝兰德受邀出席第八届丝绸之路网络安全论坛
    【网页设计】基于HTML在线图书商城购物项目设计与实现_(图书商城10页) bootstarp响应式
    518晚会聚会抽奖软件,可用作游戏随机抽签选人
    离子液体1-丁基-3-甲基咪唑六氟磷酸盐(BMI)改性氧化石墨烯(GO)文献摘要
  • 原文地址:https://blog.csdn.net/qq276726581/article/details/134069274