• 【Seata源码学习 】 篇二 TM与RM初始化过程


    【Seata源码学习 】 篇二 TM与RM初始化过程

    1.GlobalTransactionScanner 初始化

    GlobalTransactionScanner 实现了InitializingBean 接口,在初始化后将执行自定义的初始化方法

    io.seata.spring.annotation.GlobalTransactionScanner#afterPropertiesSet

       @Override
        public void afterPropertiesSet() {
        		//是否禁用了全局事务
            if (disableGlobalTransaction) {
                if (LOGGER.isInfoEnabled()) {
                    LOGGER.info("Global transaction is disabled.");
                }
                return;
            }
            //初始化客户端
            initClient();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    io.seata.spring.annotation.GlobalTransactionScanner#initClient

    private void initClient() {
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("Initializing Global Transaction Clients ... ");
            }
            if (DEFAULT_TX_GROUP_OLD.equals(txServiceGroup)) {
                LOGGER.warn("the default value of seata.tx-service-group: {} has already changed to {} since Seata 1.5, " +
                        "please change your default configuration as soon as possible " +
                        "and we don't recommend you to use default tx-service-group's value provided by seata",
                        DEFAULT_TX_GROUP_OLD, DEFAULT_TX_GROUP);
            }
            if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) {
                throw new IllegalArgumentException(String.format("applicationId: %s, txServiceGroup: %s", applicationId, txServiceGroup));
            }
            //init TM
            //初始化事务管理器
            TMClient.init(applicationId, txServiceGroup, accessKey, secretKey);
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("Transaction Manager Client is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
            }
            //init RM
            //初始化资源管理器
            RMClient.init(applicationId, txServiceGroup);
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("Resource Manager is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
            }
    
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("Global Transaction Clients are initialized. ");
            }
            //注册应用上下文关闭回调方法
            registerSpringShutdownHook();
    
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    image-20231113213409054

    2. 初始化事务管理器 TM

    流程图

    image-20231114222639955

    实例化 TmNettyRemotingClient

    io.seata.tm.TMClient#init(java.lang.String, java.lang.String, java.lang.String, java.lang.String)

     public static void init(String applicationId, String transactionServiceGroup) {
     				//TM进行netty网络通信的客户端
     				// applicationId 当前应用id  transactionServiceGroup 事务分组名称
            TmNettyRemotingClient tmNettyRemotingClient = TmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup);
            tmNettyRemotingClient.init();
        }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    首先看下获取实例的方法

    io.seata.core.rpc.netty.TmNettyRemotingClient#getInstance(java.lang.String, java.lang.String)

    public static TmNettyRemotingClient getInstance(String applicationId, String transactionServiceGroup) {
            TmNettyRemotingClient tmNettyRemotingClient = getInstance();
            tmNettyRemotingClient.setApplicationId(applicationId);
            tmNettyRemotingClient.setTransactionServiceGroup(transactionServiceGroup);
            return tmNettyRemotingClient;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    io.seata.core.rpc.netty.TmNettyRemotingClient#getInstance()

        public static TmNettyRemotingClient getInstance() {
        		//双检锁,保证只有一个实例
            if (instance == null) {
                synchronized (TmNettyRemotingClient.class) {
                    if (instance == null) {
                    		//netty的配置
                        NettyClientConfig nettyClientConfig = new NettyClientConfig();
                        //消息处理线程池
                        //核心线程和最大线程都是16 没有非核心线程
                        //有界的阻塞队列 容量为200
                        final ThreadPoolExecutor messageExecutor = new ThreadPoolExecutor(
                            nettyClientConfig.getClientWorkerThreads(), nettyClientConfig.getClientWorkerThreads(),
                            KEEP_ALIVE_TIME, TimeUnit.SECONDS,
                            new LinkedBlockingQueue<>(MAX_QUEUE_SIZE),
                            new NamedThreadFactory(nettyClientConfig.getTmDispatchThreadPrefix(),
                                nettyClientConfig.getClientWorkerThreads()),
                            RejectedPolicies.runsOldestTaskPolicy());
                         //创建实例
                        instance = new TmNettyRemotingClient(nettyClientConfig, null, messageExecutor);
                    }
                }
            }
            return instance;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    io.seata.core.rpc.netty.TmNettyRemotingClient#TmNettyRemotingClient

    private TmNettyRemotingClient(NettyClientConfig nettyClientConfig,
                                      EventExecutorGroup eventExecutorGroup,
                                      ThreadPoolExecutor messageExecutor) {
              //调用父类构造器
            super(nettyClientConfig, eventExecutorGroup, messageExecutor, NettyPoolKey.TransactionRole.TMROLE);
            //基于SPI机制加载鉴权签名组件 AuthSigner
            this.signer = EnhancedServiceLoader.load(AuthSigner.class);
            // set enableClientBatchSendRequest
            // 是否开启了批量发送请求对配置。默认 false
            this.enableClientBatchSendRequest = ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.ENABLE_TM_CLIENT_BATCH_SEND_REQUEST,
                    DefaultValues.DEFAULT_ENABLE_TM_CLIENT_BATCH_SEND_REQUEST);
            //监听配置是否有变化
            ConfigurationCache.addConfigListener(ConfigurationKeys.ENABLE_TM_CLIENT_BATCH_SEND_REQUEST, new ConfigurationChangeListener() {
                @Override
                public void onChangeEvent(ConfigurationChangeEvent event) {
                    String dataId = event.getDataId();
                    String newValue = event.getNewValue();
                    if (ConfigurationKeys.ENABLE_TM_CLIENT_BATCH_SEND_REQUEST.equals(dataId) && StringUtils.isNotBlank(newValue)) {
                        enableClientBatchSendRequest = Boolean.parseBoolean(newValue);
                    }
                }
            });
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    实例化 AbstractNettyRemotingClient

    io.seata.core.rpc.netty.AbstractNettyRemotingClient#AbstractNettyRemotingClient

     public AbstractNettyRemotingClient(NettyClientConfig nettyClientConfig, EventExecutorGroup eventExecutorGroup,
                                           ThreadPoolExecutor messageExecutor, NettyPoolKey.TransactionRole transactionRole) {
            //调用父类构造器 用于处理消息的线程池
            super(messageExecutor);
            //当前事务角色
            this.transactionRole = transactionRole;
            //创建 NettyClientBootstrap 实例 
            clientBootstrap = new NettyClientBootstrap(nettyClientConfig, eventExecutorGroup, transactionRole);
            //消息处理器  
            clientBootstrap.setChannelHandlers(new ClientHandler());
            //channel管理器
            clientChannelManager = new NettyClientChannelManager(
                new NettyPoolableFactory(this, clientBootstrap), getPoolKeyFunction(), nettyClientConfig);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    实例化 AbstractNettyRemoting

    io.seata.core.rpc.netty.AbstractNettyRemoting#AbstractNettyRemoting

      public AbstractNettyRemoting(ThreadPoolExecutor messageExecutor) {
            //设置处理消息的线程池
            this.messageExecutor = messageExecutor;
        }
    
    • 1
    • 2
    • 3
    • 4

    初始化 TmNettyRemotingClient

    回到

    io.seata.tm.TMClient#init(java.lang.String, java.lang.String, java.lang.String, java.lang.String)

    创建完成 TmNettyRemotingClient 实例后,调用init方法

    public static void init(String applicationId, String transactionServiceGroup, String accessKey, String secretKey) {
            TmNettyRemotingClient tmNettyRemotingClient = TmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup, accessKey, secretKey);
            tmNettyRemotingClient.init();
        }
    
    • 1
    • 2
    • 3
    • 4
        @Override
        public void init() {
            // registry processor
            //注册请求处理器
            registerProcessor();
            if (initialized.compareAndSet(false, true)) {
                //调用父类初始化方法 
                // 1. 定时重连
                // 2. 超时检测
                super.init();
                //如果事务分组不为空
                if (io.seata.common.util.StringUtils.isNotBlank(transactionServiceGroup)) {
                    //通过channel管理器建立链接
                    getClientChannelManager().reconnect(transactionServiceGroup);
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    io.seata.core.rpc.netty.TmNettyRemotingClient#registerProcessor

       private void registerProcessor() {
            //根据不同的消息类型,使用不同的消息处理器
            //两个处理器 一种对消息进行处理
            //还有一种是处理心跳
            // 1.registry TC response processor
            ClientOnResponseProcessor onResponseProcessor =
                    new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), getTransactionMessageHandler());
            //注册就是将 消息处理器与线程池封装成一对pair,然后在进一步封装成map,map对key为消息类型,value为封装对pair
            super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null);
            super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN_RESULT, onResponseProcessor, null);
            super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT_RESULT, onResponseProcessor, null);
            super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT_RESULT, onResponseProcessor, null);
            super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK_RESULT, onResponseProcessor, null);
            super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS_RESULT, onResponseProcessor, null);
            super.registerProcessor(MessageType.TYPE_REG_CLT_RESULT, onResponseProcessor, null);
            super.registerProcessor(MessageType.TYPE_BATCH_RESULT_MSG, onResponseProcessor, null);
            // 2.registry heartbeat message processor
            ClientHeartbeatProcessor clientHeartbeatProcessor = new ClientHeartbeatProcessor();
            super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, clientHeartbeatProcessor, null);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
     @Override
        public void registerProcessor(int requestCode, RemotingProcessor processor, ExecutorService executor) {
            Pair pair = new Pair<>(processor, executor);
            this.processorTable.put(requestCode, pair);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    初始化 AbstractNettyRemotingClient

    io.seata.core.rpc.netty.AbstractNettyRemotingClient#init

        @Override
        public void init() {
            //周期线程池 第一次在60秒后通过连接管理器重新建立链接,之后每10秒重新建立一次链接
            timerExecutor.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    clientChannelManager.reconnect(getTransactionServiceGroup());
                }
            }, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);
            if (this.isEnableClientBatchSendRequest()) {
                mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,
                    MAX_MERGE_SEND_THREAD,
                    KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
                    new LinkedBlockingQueue<>(),
                    new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));
                mergeSendExecutorService.submit(new MergedSendRunnable());
            }
            //启动一个周期线程池,每3秒检查一次请求是否超时
            super.init();
            //启动netty客户端
            clientBootstrap.start();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    io.seata.core.rpc.netty.NettyClientBootstrap#start

    启动过程中一共设置了4个消息处理器

    1. IdleStateHandler 处理心跳
    2. ProtocolV1Decoder 消息解码
    3. ProtocolV1Encoder 消息编码
    4. ClientHandler 消息处理
     @Override
        public void start() {
            if (this.defaultEventExecutorGroup == null) {
                this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyClientConfig.getClientWorkerThreads(),
                    new NamedThreadFactory(getThreadPrefix(nettyClientConfig.getClientWorkerThreadPrefix()),
                        nettyClientConfig.getClientWorkerThreads()));
            }
            this.bootstrap.group(this.eventLoopGroupWorker).channel(
                nettyClientConfig.getClientChannelClazz()).option(
                ChannelOption.TCP_NODELAY, true).option(ChannelOption.SO_KEEPALIVE, true).option(
                ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis()).option(
                ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize()).option(ChannelOption.SO_RCVBUF,
                nettyClientConfig.getClientSocketRcvBufSize());
    
            if (nettyClientConfig.enableNative()) {
                if (PlatformDependent.isOsx()) {
                    if (LOGGER.isInfoEnabled()) {
                        LOGGER.info("client run on macOS");
                    }
                } else {
                    bootstrap.option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED)
                        .option(EpollChannelOption.TCP_QUICKACK, true);
                }
            }
    
            bootstrap.handler(
                new ChannelInitializer() {
                    @Override
                    public void initChannel(SocketChannel ch) {
                        ChannelPipeline pipeline = ch.pipeline();
                        pipeline.addLast(
                            new IdleStateHandler(nettyClientConfig.getChannelMaxReadIdleSeconds(),
                                nettyClientConfig.getChannelMaxWriteIdleSeconds(),
                                nettyClientConfig.getChannelMaxAllIdleSeconds()))
                            .addLast(new ProtocolV1Decoder())
                            .addLast(new ProtocolV1Encoder());
                        if (channelHandlers != null) {
                            addChannelPipelineLast(ch, channelHandlers);
                        }
                    }
                });
    
            if (initialized.compareAndSet(false, true) && LOGGER.isInfoEnabled()) {
                LOGGER.info("NettyClientBootstrap has started");
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46

    io.seata.core.rpc.netty.AbstractNettyRemotingClient.ClientHandler

      @Override
            public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
                if (!(msg instanceof RpcMessage)) {
                    return;
                }
                processMessage(ctx, (RpcMessage) msg);
            }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    io.seata.core.rpc.netty.AbstractNettyRemoting#processMessage

    protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug(String.format("%s msgId:%s, body:%s", this, rpcMessage.getId(), rpcMessage.getBody()));
            }
            Object body = rpcMessage.getBody();
            //顶层接口 MessageTypeAware
            if (body instanceof MessageTypeAware) {
                MessageTypeAware messageTypeAware = (MessageTypeAware) body;
                //根据消息的类型获取不同的RemotingProcessor进行处理
                final Pair pair = this.processorTable.get((int) messageTypeAware.getTypeCode());
                if (pair != null) {
                    //如果线程池不为空,使用线程池执行 前面封装pair时,线程池都是null
                    if (pair.getSecond() != null) {
                        try {
                            pair.getSecond().execute(() -> {
                                try {
                                    pair.getFirst().process(ctx, rpcMessage);
                                } catch (Throwable th) {
                                    LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
                                } finally {
                                    MDC.clear();
                                }
                            });
                        } catch (RejectedExecutionException e) {
                            LOGGER.error(FrameworkErrorCode.ThreadPoolFull.getErrCode(),
                                "thread pool is full, current max pool size is " + messageExecutor.getActiveCount());
                            if (allowDumpStack) {
                                String name = ManagementFactory.getRuntimeMXBean().getName();
                                String pid = name.split("@")[0];
                                long idx = System.currentTimeMillis();
                                try {
                                    String jstackFile = idx + ".log";
                                    LOGGER.info("jstack command will dump to " + jstackFile);
                                    Runtime.getRuntime().exec(String.format("jstack %s > %s", pid, jstackFile));
                                } catch (IOException exx) {
                                    LOGGER.error(exx.getMessage());
                                }
                                allowDumpStack = false;
                            }
                        }
                    } else {
                        //如果消息处理器对应的线程池是空的,则直接处理
                        try {
                            pair.getFirst().process(ctx, rpcMessage);
                        } catch (Throwable th) {
                            LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
                        }
                    }
                } else {
                    LOGGER.error("This message type [{}] has no processor.", messageTypeAware.getTypeCode());
                }
            } else {
                LOGGER.error("This rpcMessage body[{}] is not MessageTypeAware type.", body);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55

    初始化 AbstractNettyRemoting

    io.seata.core.rpc.netty.AbstractNettyRemoting#init

    public void init() {
            //每3秒检查一次请求是否超时
            timerExecutor.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    for (Map.Entry entry : futures.entrySet()) {
                        MessageFuture future = entry.getValue();
                        if (future.isTimeout()) {
                            futures.remove(entry.getKey());
                            RpcMessage rpcMessage = future.getRequestMessage();
                            future.setResultMessage(new TimeoutException(String
                                .format("msgId: %s ,msgType: %s ,msg: %s ,request timeout", rpcMessage.getId(), String.valueOf(rpcMessage.getMessageType()), rpcMessage.getBody().toString())));
                            if (LOGGER.isDebugEnabled()) {
                                LOGGER.debug("timeout clear future: {}", entry.getValue().getRequestMessage().getBody());
                            }
                        }
                    }
    
                    nowMills = System.currentTimeMillis();
                }
            }, TIMEOUT_CHECK_INTERVAL, TIMEOUT_CHECK_INTERVAL, TimeUnit.MILLISECONDS);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    TM客户端与 TC 建立连接

    io.seata.core.rpc.netty.NettyClientChannelManager#reconnect

    void reconnect(String transactionServiceGroup) {
            List availList = null;
            try {
                //根据事务分组名称找seata服务端地址列表 默认根据File配置映射关系查找
                //tx-service-group 事务分组名
                //vgroup-mapping.事务分组名=分组seata服务列表名
                //seata.service.grouplist.分组seata服务列表名=seata服务地址
                availList = getAvailServerList(transactionServiceGroup);
            } catch (Exception e) {
                LOGGER.error("Failed to get available servers: {}", e.getMessage(), e);
                return;
            }
            if (CollectionUtils.isEmpty(availList)) {
                RegistryService registryService = RegistryFactory.getInstance();
                String clusterName = registryService.getServiceGroup(transactionServiceGroup);
                //如果找不到任何seata server 服务配置列表,抛出异常
                if (StringUtils.isBlank(clusterName)) {
                    LOGGER.error("can not get cluster name in registry config '{}{}', please make sure registry config correct",
                            ConfigurationKeys.SERVICE_GROUP_MAPPING_PREFIX,
                            transactionServiceGroup);
                    return;
                }
    
                if (!(registryService instanceof FileRegistryServiceImpl)) {
                    LOGGER.error("no available service found in cluster '{}', please make sure registry config correct and keep your seata server running", clusterName);
                }
                return;
            }
            Set channelAddress = new HashSet<>(availList.size());
            try {
                for (String serverAddress : availList) {
                    try {
                        //与所有seata server建立长连接
                        acquireChannel(serverAddress);
                        channelAddress.add(serverAddress);
                    } catch (Exception e) {
                        LOGGER.error("{} can not connect to {} cause:{}", FrameworkErrorCode.NetConnect.getErrCode(),
                            serverAddress, e.getMessage(), e);
                    }
                }
            } finally {
                if (CollectionUtils.isNotEmpty(channelAddress)) {
                    List aliveAddress = new ArrayList<>(channelAddress.size());
                    for (String address : channelAddress) {
                        String[] array = address.split(":");
                        aliveAddress.add(new InetSocketAddress(array[0], Integer.parseInt(array[1])));
                    }
                    RegistryFactory.getInstance().refreshAliveLookup(transactionServiceGroup, aliveAddress);
                } else {
                    RegistryFactory.getInstance().refreshAliveLookup(transactionServiceGroup, Collections.emptyList());
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53

    3.初始化资源管理器 RM

    io.seata.rm.RMClient#init

        public static void init(String applicationId, String transactionServiceGroup) {
            //获取RmNettyRemotingClient实例
            RmNettyRemotingClient rmNettyRemotingClient = RmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup);
            //设置资源管理器
            rmNettyRemotingClient.setResourceManager(DefaultResourceManager.get());
            //设置资源事务管理器
            rmNettyRemotingClient.setTransactionMessageHandler(DefaultRMHandler.get());
            //初始化RmNettyRemotingClient
            rmNettyRemotingClient.init();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    实例化 RmNettyRemotingClient

    io.seata.core.rpc.netty.RmNettyRemotingClient#getInstance(java.lang.String, java.lang.String)

      public static RmNettyRemotingClient getInstance(String applicationId, String transactionServiceGroup) {
            //获取实例,并创建消息处理线程池
            RmNettyRemotingClient rmNettyRemotingClient = getInstance();
            //设置应用id
            rmNettyRemotingClient.setApplicationId(applicationId);
            //设置事务分组名
            rmNettyRemotingClient.setTransactionServiceGroup(transactionServiceGroup);
            return rmNettyRemotingClient;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    io.seata.core.rpc.netty.RmNettyRemotingClient#getInstance()

    public static RmNettyRemotingClient getInstance() {
            //双检锁创建实例 保证单例
            if (instance == null) {
                synchronized (RmNettyRemotingClient.class) {
                    if (instance == null) {
                        //netty客户端配置
                        NettyClientConfig nettyClientConfig = new NettyClientConfig();
                        //消息处理线程池
                        final ThreadPoolExecutor messageExecutor = new ThreadPoolExecutor(
                            nettyClientConfig.getClientWorkerThreads(), nettyClientConfig.getClientWorkerThreads(),
                            KEEP_ALIVE_TIME, TimeUnit.SECONDS, new LinkedBlockingQueue<>(MAX_QUEUE_SIZE),
                            new NamedThreadFactory(nettyClientConfig.getRmDispatchThreadPrefix(),
                                nettyClientConfig.getClientWorkerThreads()), new ThreadPoolExecutor.CallerRunsPolicy());
                        instance = new RmNettyRemotingClient(nettyClientConfig, null, messageExecutor);
                    }
                }
            }
            return instance;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    RmNettyRemotingClient 及其父类的实例化过程都与TM是一致的,我们可以看下继承关系图

    截屏2023-11-16 21.15.23

    真正有区别的地方在于TM客户端初始化的过程与RM客户端初始化的过程

    初始化 RmNettyRemotingClient

    //获取RmNettyRemotingClient实例
            RmNettyRemotingClient rmNettyRemotingClient = RmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup);
            //设置资源管理器
            rmNettyRemotingClient.setResourceManager(DefaultResourceManager.get());
            //设置资源事务管理器
            rmNettyRemotingClient.setTransactionMessageHandler(DefaultRMHandler.get());
            //初始化RmNettyRemotingClient
            rmNettyRemotingClient.init();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    io.seata.core.rpc.netty.RmNettyRemotingClient#init

        public void init() {
            // 注册消息处理器
            registerProcessor();
            if (initialized.compareAndSet(false, true)) {
                super.init();
    
                // Found one or more resources that were registered before initialization
                // 与TC建立连接前 会先判断资源是否存在
                if (resourceManager != null
                        && !resourceManager.getManagedResources().isEmpty()
                        && StringUtils.isNotBlank(transactionServiceGroup)) {
                    getClientChannelManager().reconnect(transactionServiceGroup);
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    io.seata.core.rpc.netty.RmNettyRemotingClient#registerProcessor

    private void registerProcessor() {
            //注册类五种不同的消息处理器
            // 1.registry rm client handle branch commit processor
            // 分支事务提交
            RmBranchCommitProcessor rmBranchCommitProcessor = new RmBranchCommitProcessor(getTransactionMessageHandler(), this);
            super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT, rmBranchCommitProcessor, messageExecutor);
            // 2.registry rm client handle branch rollback processor
            // 分支事务回滚
            RmBranchRollbackProcessor rmBranchRollbackProcessor = new RmBranchRollbackProcessor(getTransactionMessageHandler(), this);
            super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK, rmBranchRollbackProcessor, messageExecutor);
            // 3.registry rm handler undo log processor
            // 回滚日志处理
            RmUndoLogProcessor rmUndoLogProcessor = new RmUndoLogProcessor(getTransactionMessageHandler());
            super.registerProcessor(MessageType.TYPE_RM_DELETE_UNDOLOG, rmUndoLogProcessor, messageExecutor);
            // 4.registry TC response processor
            // TC响应处理
            ClientOnResponseProcessor onResponseProcessor =
                new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), getTransactionMessageHandler());
            super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null);
            super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER_RESULT, onResponseProcessor, null);
            super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT_RESULT, onResponseProcessor, null);
            super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY_RESULT, onResponseProcessor, null);
            super.registerProcessor(MessageType.TYPE_REG_RM_RESULT, onResponseProcessor, null);
            super.registerProcessor(MessageType.TYPE_BATCH_RESULT_MSG, onResponseProcessor, null);
            // 5.registry heartbeat message processor
            // 心跳信息处理
            ClientHeartbeatProcessor clientHeartbeatProcessor = new ClientHeartbeatProcessor();
            super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, clientHeartbeatProcessor, null);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    以上就是TM和RM实例化的过程,至于不同的消息处理器的实现我们放到后面去看

    4. TM和RM初始化总结

    截屏2023-11-16 21.51.27

    两者其实过程是一致的,TM客户端对象TMClient主要是实例并初始化TmNettyRemotingClient,RM客户端对象RMClient主要是实例并初始化RmNettyRemotingClient。两者类继承关系如下所示

    截屏2023-11-16 21.15.23

    NettyRemotingClient 对象主要是在初始化方法中消息处理器并与TC服务端建立长连接,TM与RM注册的消息处理器是不同的,并且RM在与TC建立连接前会先判断数据库资源是否存在。TmNettyRemotingClient与RmNettyRemotingClient都将共同的方法放到抽象父类 AbstractNettyRemotingClient 中 。

    父类 AbstractNettyRemotingClient 封装了原生的Netty信息,用于创建Netty客户端对象,并在初始化方法中启动一个周期线程去定期重新发起连接请求

    AbstractNettyRemotingClient 的父类 AbstractNettyRemoting 主要是在执行初始化方法时启动 一个周期线程池,每隔3秒检测一次发送的消息集合中是否有消息超时,默认的超时时间为30秒

    io.seata.common.DefaultValues#DEFAULT_RPC_RM_REQUEST_TIMEOUT

        long DEFAULT_RPC_RM_REQUEST_TIMEOUT = Duration.ofSeconds(30).toMillis();
    
    • 1

    我们可以通过设置 transport.rpcRmRequestTimeout (毫秒)去改变这个默认的值

  • 相关阅读:
    【大语言模型】ACL2024论文-03 MAGE: 现实环境下机器生成文本检测
    小程序支付升级:实现微信支付V3接口接入
    大数据Flink(九十七):EXPLAIN、USE和SHOW 子句
    微服务:服务拆分和远程调用
    linux uboot制作自定义菜单指令menu
    【SLAM】IMU预积分的理解、手把手推导(1/4)
    Everything和SVN结合使用-在Everything中显示SVN
    spring事务管理和mysql事务有什么关系?
    Llama2-Chinese项目:3.1-全量参数微调
    mysql9
  • 原文地址:https://blog.csdn.net/JAVAlife2021/article/details/134451934