• 【Seata源码学习 】篇四 TM事务管理器是如何开启全局事务


    【Seata源码学习 】篇三 TM开启全局事务的过程

    TM发送 单个或批量 消息

    以发送GlobalBeginRequest消息为例

    TM在执行拦截器链路前将向TC发送GlobalBeginRequest 消息

    io.seata.tm.api.DefaultGlobalTransaction#begin(int, java.lang.String)

       @Override
        public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
            throws TransactionException {
            GlobalBeginRequest request = new GlobalBeginRequest();
            request.setTransactionName(name);
            request.setTimeout(timeout);
            GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request);
            if (response.getResultCode() == ResultCode.Failed) {
                throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());
            }
            return response.getXid();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    注意 消息TYPE_CODE 为 MessageType.TYPE_GLOBAL_BEGIN 值为 1

    package io.seata.core.protocol.transaction;
    
    import io.seata.core.protocol.MessageType;
    import io.seata.core.rpc.RpcContext;
    
    /**
     * The type Global begin request.
     *
     * @author slievrly
     */
    public class GlobalBeginRequest extends AbstractTransactionRequestToTC {
    
        private int timeout = 60000;
    
        private String transactionName;
    
        /**
         * Gets timeout.
         *
         * @return the timeout
         */
        public int getTimeout() {
            return timeout;
        }
    
        /**
         * Sets timeout.
         *
         * @param timeout the timeout
         */
        public void setTimeout(int timeout) {
            this.timeout = timeout;
        }
    
        /**
         * Gets transaction name.
         *
         * @return the transaction name
         */
        public String getTransactionName() {
            return transactionName;
        }
    
        /**
         * Sets transaction name.
         *
         * @param transactionName the transaction name
         */
        public void setTransactionName(String transactionName) {
            this.transactionName = transactionName;
        }
    
        @Override
        public short getTypeCode() {
            return MessageType.TYPE_GLOBAL_BEGIN;
        }
    
    
        @Override
        public AbstractTransactionResponse handle(RpcContext rpcContext) {
            return handler.handle(this, rpcContext);
        }
    
        @Override
        public String toString() {
            StringBuilder result = new StringBuilder();
            result.append("timeout=");
            result.append(timeout);
            result.append(",");
            result.append("transactionName=");
            result.append(transactionName);
    
            return result.toString();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76

    io.seata.tm.DefaultTransactionManager#syncCall

        private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException {
            try {
                return (AbstractTransactionResponse) TmNettyRemotingClient.getInstance().sendSyncRequest(request);
            } catch (TimeoutException toe) {
                throw new TmTransactionException(TransactionExceptionCode.IO, "RPC timeout", toe);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    io.seata.core.rpc.netty.AbstractNettyRemotingClient#sendSyncRequest(java.lang.Object)

    @Override
        public Object sendSyncRequest(Object msg) throws TimeoutException {
            // 获取seata服务端地址
            String serverAddress = loadBalance(getTransactionServiceGroup(), msg);
            // TM RPC服务调用默认超时时间30s
            long timeoutMillis = this.getRpcRequestTimeout();
            // 将GlobalBeginRequest 封装为 RpcMessage
            RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC);
    
            // send batch message
            // put message into basketMap, @see MergedSendRunnable
            // 是否开启seata客户端批量发送消息 1.5默认关闭
            if (this.isEnableClientBatchSendRequest()) {
    
                // send batch message is sync request, needs to create messageFuture and put it in futures.
                // 批量发送消息需要将消息封装为 MessageFuture 对象 并添加到 futures Map集合中
                MessageFuture messageFuture = new MessageFuture();
                messageFuture.setRequestMessage(rpcMessage);
                messageFuture.setTimeout(timeoutMillis);
                futures.put(rpcMessage.getId(), messageFuture);
    
                // put message into basketMap
                // 获取当前服务端地址对应的消息队列
                BlockingQueue basket = CollectionUtils.computeIfAbsent(basketMap, serverAddress,
                    key -> new LinkedBlockingQueue<>());
                // 将当前消息添加到队列中 一般不会添加失败 LinkedBlockingQueue 是无界队列
                if (!basket.offer(rpcMessage)) {
                    LOGGER.error("put message into basketMap offer failed, serverAddress:{},rpcMessage:{}",
                            serverAddress, rpcMessage);
                    return null;
                }
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("offer message: {}", rpcMessage.getBody());
                }
                // 如果当前没有在发送队列消息 给mergeLock对象上锁成功 则唤醒所有等待发送消息的线程
                // isSending 被volatile 修饰 保证可见性和有序性 但是不保证原子性
                if (!isSending) {
                    synchronized (mergeLock) {
                        mergeLock.notifyAll();
                    }
                }
    
                try {
                    // MessageFuture 封装了 CompletableFuture 对象,此时会超时阻塞当前线程,超时时间30秒
                    // 等待 CompletableFuture.complete 完成获取结果
                    return messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);
                } catch (Exception exx) {
                    // 如果有异常抛出
                    LOGGER.error("wait response error:{},ip:{},request:{}",
                        exx.getMessage(), serverAddress, rpcMessage.getBody());
                    if (exx instanceof TimeoutException) {
                        throw (TimeoutException) exx;
                    } else {
                        throw new RuntimeException(exx);
                    }
                }
    
            } else {
                // 如果没有开启客户端批量发送消息 先获取channel
                Channel channel = clientChannelManager.acquireChannel(serverAddress);
                // 同步发送消息 并将RPCMessage 封装为 MessageFuture,并设置超时时间 放入 futures Map集合中
                // 由父类AbstractNettyRemoting的周期线程每隔3秒检查一次消息是否超时
                // 发送消息时会添加 ChannelFutureListener 监听器,如果消息成功,则调用 CompletableFuture.complete 设置结果,
                // 并将当前消息id对应的MessageFuture 从futures 中移除
                return super.sendSync(channel, rpcMessage, timeoutMillis);
            }
    
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68

    单个发送消息

    如果是发送单个消息,则直接调用AbstractNettyRemoting.sendSync 向TC端发送消息

    io.seata.core.rpc.netty.AbstractNettyRemoting#sendSync

    protected Object sendSync(Channel channel, RpcMessage rpcMessage, long timeoutMillis) throws TimeoutException {
            if (timeoutMillis <= 0) {
                throw new FrameworkException("timeout should more than 0ms");
            }
            if (channel == null) {
                LOGGER.warn("sendSync nothing, caused by null channel.");
                return null;
            }
    
            MessageFuture messageFuture = new MessageFuture();
            messageFuture.setRequestMessage(rpcMessage);
            // 设置超时时间 用于检测是否超时 System.currentTimeMillis() - start > timeout
            messageFuture.setTimeout(timeoutMillis);
            futures.put(rpcMessage.getId(), messageFuture);
    
            channelWritableCheck(channel, rpcMessage.getBody());
    
            String remoteAddr = ChannelUtil.getAddressFromChannel(channel);
            doBeforeRpcHooks(remoteAddr, rpcMessage);
    
            channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {
                if (!future.isSuccess()) {
                    //根据消息id从futures中移除,不再进行消息超时检测
                    MessageFuture messageFuture1 = futures.remove(rpcMessage.getId());
                    if (messageFuture1 != null) {
                        //设置结果
                        messageFuture1.setResultMessage(future.cause());
                    }
                    //销毁连接
                    destroyChannel(future.channel());
                }
            });
    
            try {
                // 超时阻塞等待获取结果
                Object result = messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);
                doAfterRpcHooks(remoteAddr, rpcMessage, result);
                return result;
            } catch (Exception exx) {
                LOGGER.error("wait response error:{},ip:{},request:{}", exx.getMessage(), channel.remoteAddress(),
                    rpcMessage.getBody());
                if (exx instanceof TimeoutException) {
                    throw (TimeoutException) exx;
                } else {
                    throw new RuntimeException(exx);
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48

    messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS) 会阻塞等待30s获取消息,超时则抛出异常 TimeoutException

    批量发送消息

     // send batch message is sync request, needs to create messageFuture and put it in futures.
                // 批量发送消息需要将消息封装为 MessageFuture 对象 并添加到 futures Map集合中
                MessageFuture messageFuture = new MessageFuture();
                messageFuture.setRequestMessage(rpcMessage);
                messageFuture.setTimeout(timeoutMillis);
                futures.put(rpcMessage.getId(), messageFuture);
    
                // put message into basketMap
                // 获取当前服务端地址对应的消息队列
                BlockingQueue basket = CollectionUtils.computeIfAbsent(basketMap, serverAddress,
                    key -> new LinkedBlockingQueue<>());
                // 将当前消息添加到队列中 一般不会添加失败 LinkedBlockingQueue 是无界队列
                if (!basket.offer(rpcMessage)) {
                    LOGGER.error("put message into basketMap offer failed, serverAddress:{},rpcMessage:{}",
                            serverAddress, rpcMessage);
                    return null;
                }
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("offer message: {}", rpcMessage.getBody());
                }
                // 如果当前没有在发送队列消息 给mergeLock对象上锁成功 则唤醒所有等待发送消息的线程
                // isSending 被volatile 修饰 保证可见性和有序性 但是不保证原子性
                if (!isSending) {
                    synchronized (mergeLock) {
                        mergeLock.notifyAll();
                    }
                }
    
                try {
                    // MessageFuture 封装了 CompletableFuture 对象,此时会超时阻塞当前线程,超时时间30秒
                    // 等待 CompletableFuture.complete 完成获取结果
                    return messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);
                } catch (Exception exx) {
                    // 如果有异常抛出
                    LOGGER.error("wait response error:{},ip:{},request:{}",
                        exx.getMessage(), serverAddress, rpcMessage.getBody());
                    if (exx instanceof TimeoutException) {
                        throw (TimeoutException) exx;
                    } else {
                        throw new RuntimeException(exx);
                    }
                }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    如果批量发送消息,则会将消息放到basketMap 集合中,AbstractNettyRemotingClient会在初始化时,启动最大和核心都是一的单线程池线程池,提交MergedSendRunnable 任务,死循环不断遍历basketMap,获取等待发送的消息队列,最终由io.seata.core.rpc.netty.AbstractNettyRemoting#sendAsync 发送异步消息。需要注意的是,不管是发送同步消息还是异步消息,TM开启事务所属的线程都会因messageFuture.get 超时阻塞,只不过发送和获取返回消息都变成了异步。

    public void run() {
                // 死循环
                while (true) {
                    //先上锁
                    synchronized (mergeLock) {
                        // 等待 1s 并释放当前锁
                        try {
                            mergeLock.wait(MAX_MERGE_SEND_MILLS);
                        } catch (InterruptedException e) {
                        }
                    }
                    isSending = true;
                    // 遍历Map集合
                    basketMap.forEach((address, basket) -> {
                        if (basket.isEmpty()) {
                            return;
                        }
    
                        MergedWarpMessage mergeMessage = new MergedWarpMessage();
                        // 弹出同一个seata服务器地址等待发送的所有消息,合并在一块发送
                        while (!basket.isEmpty()) {
                            RpcMessage msg = basket.poll();
                            // 获取消息体 与消息id 封装为 MergedWarpMessage
                            mergeMessage.msgs.add((AbstractMessage) msg.getBody());
                            mergeMessage.msgIds.add(msg.getId());
                        }
                        if (mergeMessage.msgIds.size() > 1) {
                            printMergeMessageLog(mergeMessage);
                        }
                        Channel sendChannel = null;
                        try {
                            // send batch message is sync request, but there is no need to get the return value.
                            // Since the messageFuture has been created before the message is placed in basketMap,
                            // the return value will be obtained in ClientOnResponseProcessor.
                            // 发送批量消息不会在此处阻塞等待消息的返回  将会采用异步的方式 由 ClientOnResponseProcessor 消息处理器获取返回消息
                            sendChannel = clientChannelManager.acquireChannel(address);
                            AbstractNettyRemotingClient.this.sendAsyncRequest(sendChannel, mergeMessage);
                        } catch (FrameworkException e) {
                            if (e.getErrcode() == FrameworkErrorCode.ChannelIsNotWritable && sendChannel != null) {
                                destroyChannel(address, sendChannel);
                            }
                            // fast fail
                            for (Integer msgId : mergeMessage.msgIds) {
                                MessageFuture messageFuture = futures.remove(msgId);
                                if (messageFuture != null) {
                                    messageFuture.setResultMessage(
                                        new RuntimeException(String.format("%s is unreachable", address), e));
                                }
                            }
                            LOGGER.error("client merge call failed: {}", e.getMessage(), e);
                        }
                    });
                    isSending = false;
                }
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55

    接受异步消息由TM初始化时添加的ClientOnResponseProcessor 进行处理,将会遍历所有合并的消息,根据消息ID将其从futures中移除,并调用 future.setResultMessage 设置结果,此时TM发送消息时的阻塞状态将会被唤醒。

    io.seata.core.rpc.processor.client.ClientOnResponseProcessor#process

      @Override
        public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
            //如果当前消息属于合并发送的消息
            if (rpcMessage.getBody() instanceof MergeResultMessage) {
                //获取消息体与消息ID,并将消息id对应的MessageFuture从futures中移除,不再进行超时检测
                MergeResultMessage results = (MergeResultMessage) rpcMessage.getBody();
                MergedWarpMessage mergeMessage = (MergedWarpMessage) mergeMsgMap.remove(rpcMessage.getId());
                //遍历所有的消息
                for (int i = 0; i < mergeMessage.msgs.size(); i++) {
                    int msgId = mergeMessage.msgIds.get(i);
                    MessageFuture future = futures.remove(msgId);
                    if (future == null) {
                        LOGGER.error("msg: {} is not found in futures, result message: {}", msgId,results.getMsgs()[i]);
                    } else {
                        //在此时设置消息结果 结束阻塞等待
                        future.setResultMessage(results.getMsgs()[i]);
                    }
                }
                // 与合并消息的处理是一致的
            } else if (rpcMessage.getBody() instanceof BatchResultMessage) {
                try {
                    BatchResultMessage batchResultMessage = (BatchResultMessage) rpcMessage.getBody();
                    for (int i = 0; i < batchResultMessage.getMsgIds().size(); i++) {
                        int msgId = batchResultMessage.getMsgIds().get(i);
                        MessageFuture future = futures.remove(msgId);
                        if (future == null) {
                            LOGGER.error("msg: {} is not found in futures, result message: {}", msgId, batchResultMessage.getResultMessages().get(i));
                        } else {
                            future.setResultMessage(batchResultMessage.getResultMessages().get(i));
                        }
                    }
                } finally {
                    // In order to be compatible with the old version, in the batch sending of version 1.5.0,
                    // batch messages will also be placed in the local cache of mergeMsgMap,
                    // but version 1.5.0 no longer needs to obtain batch messages from mergeMsgMap
                    mergeMsgMap.clear();
                }
            } else {
                MessageFuture messageFuture = futures.remove(rpcMessage.getId());
                if (messageFuture != null) {
                    messageFuture.setResultMessage(rpcMessage.getBody());
                } else {
                    if (rpcMessage.getBody() instanceof AbstractResultMessage) {
                        if (transactionMessageHandler != null) {
                            transactionMessageHandler.onResponse((AbstractResultMessage) rpcMessage.getBody(), null);
                        }
                    }
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50

    TC 处理 GlobalBeginRequest 消息

    NettyChannel消息处理

    io.seata.core.rpc.netty.NettyRemotingServer#init

     @Override
        public void init() {
            // registry processor
            registerProcessor();
            if (initialized.compareAndSet(false, true)) {
                super.init();
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    io.seata.core.rpc.netty.NettyRemotingServer#registerProcessor

    private void registerProcessor() {
            // 1. registry on request message processor
            ServerOnRequestProcessor onRequestProcessor =
                new ServerOnRequestProcessor(this, getHandler());
            ShutdownHook.getInstance().addDisposable(onRequestProcessor);
            super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER, onRequestProcessor, messageExecutor);
            super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT, onRequestProcessor, messageExecutor);
            //处理GlobalBeginRequest消息  ServerOnRequestProcessor
            super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN, onRequestProcessor, messageExecutor);
            super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT, onRequestProcessor, messageExecutor);
            super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY, onRequestProcessor, messageExecutor);
            super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT, onRequestProcessor, messageExecutor);
            super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK, onRequestProcessor, messageExecutor);
            super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS, onRequestProcessor, messageExecutor);
            super.registerProcessor(MessageType.TYPE_SEATA_MERGE, onRequestProcessor, messageExecutor);
            // 2. registry on response message processor
            ServerOnResponseProcessor onResponseProcessor =
                new ServerOnResponseProcessor(getHandler(), getFutures());
            super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT_RESULT, onResponseProcessor, branchResultMessageExecutor);
            super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK_RESULT, onResponseProcessor, branchResultMessageExecutor);
            // 3. registry rm message processor
            RegRmProcessor regRmProcessor = new RegRmProcessor(this);
            super.registerProcessor(MessageType.TYPE_REG_RM, regRmProcessor, messageExecutor);
            // 4. registry tm message processor
            RegTmProcessor regTmProcessor = new RegTmProcessor(this);
            super.registerProcessor(MessageType.TYPE_REG_CLT, regTmProcessor, null);
            // 5. registry heartbeat message processor
            ServerHeartbeatProcessor heartbeatMessageProcessor = new ServerHeartbeatProcessor(this);
            super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, heartbeatMessageProcessor, null);
        }	
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    进一步跟进查看具体如何处理

    io.seata.core.rpc.processor.server.ServerOnRequestProcessor#process

        public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
            // channel是否已经注册
            if (ChannelManager.isRegistered(ctx.channel())) {
                onRequestMessage(ctx, rpcMessage);
            } else {
                try {
                    if (LOGGER.isInfoEnabled()) {
                        LOGGER.info("closeChannelHandlerContext channel:" + ctx.channel());
                    }
                    ctx.disconnect();
                    ctx.close();
                } catch (Exception exx) {
                    LOGGER.error(exx.getMessage());
                }
                if (LOGGER.isInfoEnabled()) {
                    LOGGER.info(String.format("close a unhandled connection! [%s]", ctx.channel().toString()));
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    进入io.seata.core.rpc.processor.server.ServerOnRequestProcessor#onRequestMessage

    private void onRequestMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) {
            Object message = rpcMessage.getBody();
            RpcContext rpcContext = ChannelManager.getContextFromIdentified(ctx.channel());
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("server received:{},clientIp:{},vgroup:{}", message,
                    NetUtil.toIpAddress(ctx.channel().remoteAddress()), rpcContext.getTransactionServiceGroup());
            } else {
                try {
                    BatchLogHandler.INSTANCE.getLogQueue()
                        .put(message + ",clientIp:" + NetUtil.toIpAddress(ctx.channel().remoteAddress()) + ",vgroup:"
                            + rpcContext.getTransactionServiceGroup());
                } catch (InterruptedException e) {
                    LOGGER.error("put message to logQueue error: {}", e.getMessage(), e);
                }
            }
            //GlobalBeginRequest 继承 AbstractTransactionRequest 继承 AbstractMessage
            if (!(message instanceof AbstractMessage)) {
                return;
            }
            // 合并消息处理
            // the batch send request message
            if (message instanceof MergedWarpMessage) {
                //是否开启了TC批量响应 默认false  rpcContext 的 version 不为空并且大于等于 1.5.0
                // 如果满足 则使用 MergedWarpMessage 来处理请求消息
                if (NettyServerConfig.isEnableTcServerBatchSendResponse() && StringUtils.isNotBlank(rpcContext.getVersion())
                    && Version.isAboveOrEqualVersion150(rpcContext.getVersion())) {
                    List msgs = ((MergedWarpMessage)message).msgs;
                    List msgIds = ((MergedWarpMessage)message).msgIds;
                    //遍历处理
                    for (int i = 0; i < msgs.size(); i++) {
                        AbstractMessage msg = msgs.get(i);
                        int msgId = msgIds.get(i);
                        //是否开启并发处理消息 默认关闭
                        if (PARALLEL_REQUEST_HANDLE) {
                            CompletableFuture.runAsync(
                                () -> handleRequestsByMergedWarpMessageBy150(msg, msgId, rpcMessage, ctx, rpcContext));
                        } else {
                            //单个消息处理
                            handleRequestsByMergedWarpMessageBy150(msg, msgId, rpcMessage, ctx, rpcContext);
                        }
                    }
                } else {
                    // results 响应结果集 如果开启了并发处理消息 需要保证线程安全
                    // completableFutures 并发处理消息
                    List results = new CopyOnWriteArrayList<>();
                    List> completableFutures = null;
                    for (int i = 0; i < ((MergedWarpMessage)message).msgs.size(); i++) {
                        // 默认关闭 没有开启并发处理消息 如果开启了 则使用 completableFutures 来并发处理消息
                        if (PARALLEL_REQUEST_HANDLE) {
                            if (completableFutures == null) {
                                completableFutures = new ArrayList<>();
                            }
                            int finalI = i;
                            // 并发异步处理消息,并将结果添加到results中
                            completableFutures.add(CompletableFuture.runAsync(() -> {
                                results.add(finalI, handleRequestsByMergedWarpMessage(
                                    ((MergedWarpMessage)message).msgs.get(finalI), rpcContext));
                            }));
                        } else {
                            // 处理消息并按顺序添加到results集合中
                            results.add(i,
                                handleRequestsByMergedWarpMessage(((MergedWarpMessage)message).msgs.get(i), rpcContext));
                        }
                    }
                    if (CollectionUtils.isNotEmpty(completableFutures)) {
                        try {
                            CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[0])).get();
                        } catch (InterruptedException | ExecutionException e) {
                            LOGGER.error("handle request error: {}", e.getMessage(), e);
                        }
                    }
                    MergeResultMessage resultMessage = new MergeResultMessage();
                    resultMessage.setMsgs(results.toArray(new AbstractResultMessage[0]));
                    remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), resultMessage);
                }
            } else {
                // the single send request message
                final AbstractMessage msg = (AbstractMessage) message;
                AbstractResultMessage result = transactionMessageHandler.onRequest(msg, rpcContext);
                remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), result);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82

    io.seata.core.rpc.processor.server.ServerOnRequestProcessor#handleRequestsByMergedWarpMessage

    private AbstractResultMessage handleRequestsByMergedWarpMessage(AbstractMessage subMessage, RpcContext rpcContext) {
        return transactionMessageHandler.onRequest(subMessage, rpcContext);
    }
    
    • 1
    • 2
    • 3

    io.seata.server.coordinator.DefaultCoordinator#onRequest

    public AbstractResultMessage onRequest(AbstractMessage request, RpcContext context) {
            if (!(request instanceof AbstractTransactionRequestToTC)) {
                throw new IllegalArgumentException();
            }
            AbstractTransactionRequestToTC transactionRequest = (AbstractTransactionRequestToTC) request;
            transactionRequest.setTCInboundHandler(this);
    
            return transactionRequest.handle(context);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    io.seata.core.protocol.transaction.GlobalBeginRequest#handle

    public AbstractTransactionResponse handle(RpcContext rpcContext) {
            return handler.handle(this, rpcContext);
        }
    
    • 1
    • 2
    • 3

    io.seata.server.AbstractTCInboundHandler#handle(io.seata.core.protocol.transaction.GlobalBeginRequest, io.seata.core.rpc.RpcContext)

      public GlobalBeginResponse handle(GlobalBeginRequest request, final RpcContext rpcContext) {
            GlobalBeginResponse response = new GlobalBeginResponse();
            exceptionHandleTemplate(new AbstractCallback() {
                @Override
                public void execute(GlobalBeginRequest request, GlobalBeginResponse response) throws TransactionException {
                    try {
                        //真正处理beging消息
                        doGlobalBegin(request, response, rpcContext);
                    } catch (StoreException e) {
                        throw new TransactionException(TransactionExceptionCode.FailedStore,
                            String.format("begin global request failed. xid=%s, msg=%s", response.getXid(), e.getMessage()),
                            e);
                    }
                }
            }, request, response);
            return response;
        }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    获取全局事务XID

    io.seata.server.coordinator.DefaultCoordinator#doGlobalBegin

        protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext)
                throws TransactionException {
            // 通过DefaultCore根据应用ID,事务分组名称,超时时间 创建并开启一个新的事务 返回全局事务XID 放入GlobalBeginResponse中
            response.setXid(core.begin(rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(),
                    request.getTransactionName(), request.getTimeout()));
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("Begin new global transaction applicationId: {},transactionServiceGroup: {}, transactionName: {},timeout:{},xid:{}",
                        rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(), request.getTransactionName(), request.getTimeout(), response.getXid());
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    全局事务会话持久化

    io.seata.server.coordinator.DefaultCore#begin

        @Override
        public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
            throws TransactionException {
            // 创建全局事务对象 由XID.generateXID 生成全局事务XID
            GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name,
                timeout);
            // 将xid放进ThreadLocal中
            MDC.put(RootContext.MDC_KEY_XID, session.getXid());
    //        SessionHolder.getRootSessionManager() 根据SPI机制去查找
    //        SessionHolder 在 io.seata.server.Server.start 启动时初始化,获取当前配置的 会话持久机制模式  ,调用  SessionHolder.init(sessionStoreMode)进行初始化
    //        此时我们通过 SessionHolder.getRootSessionManager() 将使用seata的SPI机制去 META-INF/services/ 与  META-INF/seata/ 目录下查找
    //        文件名为 io.seata.server.session.SessionManager 的文件 在根据 @LoadLevel注解的name值加载需要的对象
    //         例如 如果我们此时的session持久化模式为DB,那么  SessionHolder.getRootSessionManager() 将加载返回 DataBaseSessionManager 对象
            session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
    
            //开启事务 标记GlobalSession 的事务状态为 1 ,并记录开启时间与激活状态  调用SessionLifecycleListener监听器的onBegin方法
            session.begin();
    
            // transaction start event
            //发送事务开启事件
            MetricsPublisher.postSessionDoingEvent(session, false);
    
            //返回XID
            return session.getXid();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    Seata的SPI机制会根据 EnhancedServiceLoader.load(类, 名称) 方法参数一的类的全限定类名,从META-INF/services/ 与 META-INF/seata/ 路径下去匹配,然后根据匹配到的类的全限定类名,定位到具体的类,再根据参数二名称与@LoadLevel注解的name值进行匹配 ,确定要加载的对象,如下所示

    image-20231121152441877

    加载 DataBaseSessionManager 对象后,添加其到session的生命监听器列表中,在执行session.begin方法时,调用监听器的onBegin方法,进而由父类AbstractSessionManager 执行 onBegin方法,并调用DataBaseSessionManager重写的addGlobalSession方法

    io.seata.server.storage.db.session.DataBaseSessionManager#addGlobalSession

        @Override
        public void addGlobalSession(GlobalSession session) throws TransactionException {
            if (StringUtils.isBlank(taskName)) {
                // 将全局事务会话信息写入数据库
                boolean ret = transactionStoreManager.writeSession(LogOperation.GLOBAL_ADD, session);
                if (!ret) {
                    throw new StoreException("addGlobalSession failed.");
                }
            } else {
                boolean ret = transactionStoreManager.writeSession(LogOperation.GLOBAL_UPDATE, session);
                if (!ret) {
                    throw new StoreException("addGlobalSession failed.");
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    io.seata.server.storage.db.store.DataBaseTransactionStoreManager#writeSession

        public boolean writeSession(LogOperation logOperation, SessionStorable session) {
            if (LogOperation.GLOBAL_ADD.equals(logOperation)) {
                // logStore 封装了 DataSource   根据不同类型的数据源生成相应的insert语句 持久化到数据库中
                return logStore.insertGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
            } else if (LogOperation.GLOBAL_UPDATE.equals(logOperation)) {
                return logStore.updateGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
            } else if (LogOperation.GLOBAL_REMOVE.equals(logOperation)) {
                return logStore.deleteGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
            } else if (LogOperation.BRANCH_ADD.equals(logOperation)) {
                return logStore.insertBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
            } else if (LogOperation.BRANCH_UPDATE.equals(logOperation)) {
                return logStore.updateBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
            } else if (LogOperation.BRANCH_REMOVE.equals(logOperation)) {
                return logStore.deleteBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
            } else {
                throw new StoreException("Unknown LogOperation:" + logOperation.name());
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    io.seata.server.storage.db.store.LogStoreDataBaseDAO#insertGlobalTransactionDO

    public boolean insertGlobalTransactionDO(GlobalTransactionDO globalTransactionDO) {
            // 生成SQL插入 global_table (默认) 表中,持久化全局事务会话
            String sql = LogStoreSqlsFactory.getLogStoreSqls(dbType).getInsertGlobalTransactionSQL(globalTable);
            Connection conn = null;
            PreparedStatement ps = null;
            try {
                int index = 1;
                conn = logStoreDataSource.getConnection();
                //自动提交
                conn.setAutoCommit(true);
                ps = conn.prepareStatement(sql);
                ps.setString(index++, globalTransactionDO.getXid());
                ps.setLong(index++, globalTransactionDO.getTransactionId());
                ps.setInt(index++, globalTransactionDO.getStatus());
                ps.setString(index++, globalTransactionDO.getApplicationId());
                ps.setString(index++, globalTransactionDO.getTransactionServiceGroup());
                String transactionName = globalTransactionDO.getTransactionName();
                transactionName = transactionName.length() > transactionNameColumnSize ?
                    transactionName.substring(0, transactionNameColumnSize) :
                    transactionName;
                ps.setString(index++, transactionName);
                ps.setInt(index++, globalTransactionDO.getTimeout());
                ps.setLong(index++, globalTransactionDO.getBeginTime());
                ps.setString(index++, globalTransactionDO.getApplicationData());
                return ps.executeUpdate() > 0;
            } catch (SQLException e) {
                throw new StoreException(e);
            } finally {
                IOUtil.close(ps, conn);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    最终生成的SQL如下所示

    insert into global_table(xid, transaction_id, status, application_id, transaction_service_group, transaction_name,
                             timeout, begin_time, application_data, gmt_create, gmt_modified)
    values (全局事务XID, 事务id, 事务状态,应用id, 事务分组, 事务名称, 超时时间, 开始时间, 应用数据, now(), now())
    
    • 1
    • 2
    • 3

    数据库中的数据

    image-20231121214708246

    返回GlobalBeginResponse消息

    在创建全局事务并开启后,拿到XID后封装到GlobalBeginResponse中,最终由remotingServer.sendAsyncResponse将GlobalBeginResponse消息返回给TM

    TM处理 GlobalBeginResponse 消息

    在seata服务端返回 GlobalBeginResponse 消息 后, TM还是由 io.seata.core.rpc.processor.client.ClientOnResponseProcessor#process 处理接收到的消息,通过调用future.setResultMessage 设置消息结果,并恢复阻塞的TM发送GlobalBeginRequest消息的线程,将结果返回

    io.seata.tm.api.DefaultGlobalTransaction#begin(int, java.lang.String)

    public void begin(int timeout, String name) throws TransactionException {
            if (role != GlobalTransactionRole.Launcher) {
                assertXIDNotNull();
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("Ignore Begin(): just involved in global transaction [{}]", xid);
                }
                return;
            }
            assertXIDNull();
            String currentXid = RootContext.getXID();
            if (currentXid != null) {
                throw new IllegalStateException("Global transaction already exists," +
                    " can't begin a new global transaction, currentXid = " + currentXid);
            }
            // 获取到seata服务端返回到XID
            xid = transactionManager.begin(null, null, name, timeout);
            status = GlobalStatus.Begin;
            //将xid绑定到RootContext中
            RootContext.bind(xid);
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("Begin new global transaction [{}]", xid);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    获取到XID后,将XID绑定到RootContext中,至此,全局事务的开启过程也就结束了。

    总结流程图

    TM开启全局事务过程

  • 相关阅读:
    ubuntu20添加一个硬盘
    vue2 vant-ui 实现搜索过滤、高亮功能
    HTTP 413 Request Entity Too Large(Payload Too Large)
    遥感语义分割、变化检测论文小trick合集(持续更新)
    rabbitmq资料汇总
    git 上传大文件
    udp协议下sendto与recvfrom函数对应的errno
    最受欢迎的程序员副业排行榜TOP6
    SpringCloud文件上传
    TiDB Lightning 并行导入
  • 原文地址:https://blog.csdn.net/JAVAlife2021/article/details/134542760