• Seata AT模式源码解析一(Seata Server端启动流程)


    启动类 Server

    seata-server的入口类在Server类中,源码如下:

    public class Server {
        /**
        * The entry point of application.
        *
        * @param args the input arguments
        * @throws IOException the io exception
        */
        public static void main(String[] args) throws IOException {
            // 获取端口,默认是8091
            int port = PortHelper.getPort(args);
            System.setProperty(ConfigurationKeys.SERVER_PORT, Integer.toString(port));
            
            // create logger
            final Logger logger = LoggerFactory.getLogger(Server.class);
            if (ContainerHelper.isRunningInContainer()) {
                logger.info("The server is running in container.");
            }
            
            //参数解析器,用来解析启动的配置,包括file.conf和registry.conf
            //Note that the parameter parser should always be the first line to execute.
            //Because, here we need to parse the parameters needed for startup.
            ParameterParser parameterParser = new ParameterParser(args);
            
            //initialize the metrics
            MetricsManager.get().init();
            
            // 把从配置文件中读取到的storeMode写入SystemProperty中,方便其他类使用
            System.setProperty(ConfigurationKeys.STORE_MODE, parameterParser.getStoreMode());
            
            // netty的线程池
            ThreadPoolExecutor workingThreads = new ThreadPoolExecutor(NettyServerConfig.getMinServerPoolSize(),
                                                                       NettyServerConfig.getMaxServerPoolSize(), NettyServerConfig.getKeepAliveTime(), TimeUnit.SECONDS,
                                                                       new LinkedBlockingQueue<>(NettyServerConfig.getMaxTaskQueueSize()),
                                                                       new NamedThreadFactory("ServerHandlerThread", NettyServerConfig.getMaxServerPoolSize()), new ThreadPoolExecutor.CallerRunsPolicy());
            
            // 创建NettyRemotingServer实例,主要就是创建一个NettyServerBootstrap,负责与TM,RM进行通信
            NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(workingThreads);
            // 监听的端口,8091
            nettyRemotingServer.setListenPort(parameterParser.getPort());
            // 初始化UUIDGenerator,UUID生成器,基于雪花算法,用于生成全局事务id,分支事务id
            // 多个Server实例配置不同的ServerNode,保证id的唯一性
            UUIDGenerator.init(parameterParser.getServerNode());
            // SessionHodler负责事务日志(状态)的持久化存储,
            // 根据不同的存储模式来创建
            SessionHolder.init(parameterParser.getStoreMode());
            
            // 创建DefaultCoordinator实例并初始化,DefaultCoordinator是TC的核心事务逻辑处理类
            DefaultCoordinator coordinator = new DefaultCoordinator(nettyRemotingServer);
            coordinator.init();
            // 将coordinator设置为事务消息处理器,处理netty接收到的事务请求
            nettyRemotingServer.setHandler(coordinator);
            // register ShutdownHook
            ShutdownHook.getInstance().addDisposable(coordinator);
            ShutdownHook.getInstance().addDisposable(nettyRemotingServer);
            
            //127.0.0.1 and 0.0.0.0 are not valid here.
            if (NetUtil.isValidIp(parameterParser.getHost(), false)) {
                XID.setIpAddress(parameterParser.getHost());
            } else {
                XID.setIpAddress(NetUtil.getLocalIp());
            }
            XID.setPort(nettyRemotingServer.getListenPort());
            
            try {
                // 初始化netty,开始监听端口并阻塞在这里
                nettyRemotingServer.init();
            } catch (Throwable e) {
                logger.error("nettyServer init error:{}", e.getMessage(), e);
                System.exit(-1);
            }
            
            System.exit(0);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74

    在阅读源码的时候,有些源码是要细看的,但是有些源码可以大致猜测一下它的作用,就直接略过去了,抓住真正的重点去看。

    SessionHolder初始化

    SessionHolder负责Session的持久化,一个session对象代表一个事务。SessionHolder包含了4个session管理器,用来操作session。

    // 用于获取所有的session,以及session的创建,更新和删除
    private static SessionManager ROOT_SESSION_MANAGER;
    // 用于获取,更新所有异步commit的session
    private static SessionManager ASYNC_COMMITTING_SESSION_MANAGER;
    // 用于获取,更新所有需要重试commit的session
    private static SessionManager RETRY_COMMITTING_SESSION_MANAGER;
    // 用于获取,更新所有需要重试rollback的session
    private static SessionManager RETRY_ROLLBACKING_SESSION_MANAGER;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    初始化方法init

    public static void init(String mode) {
        if (StringUtils.isBlank(mode)) {
            mode = CONFIG.getConfig(ConfigurationKeys.STORE_MODE);
        }
        StoreMode storeMode = StoreMode.get(mode);
        // 数据库存储模式,一般也是推荐用数据库
        if (StoreMode.DB.equals(storeMode)) {p
            // SPI方式加载SessionManager
            // 这里4个SessionManager都是DataBaseSessionManager类的4个不同实例
            ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName());
            ASYNC_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(),
                                                                          new Object[] {ASYNC_COMMITTING_SESSION_MANAGER_NAME}); // async.commit.data 表示是用来处理异步提交
            RETRY_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(),
                                                                          new Object[] {RETRY_COMMITTING_SESSION_MANAGER_NAME}); // retry.commit.data 表示是用来处理重试提交的
            RETRY_ROLLBACKING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(),
                                                                           new Object[] {RETRY_ROLLBACKING_SESSION_MANAGER_NAME}); // retry.rollback.data 表示是用来处理重试回滚的
        } else if (StoreMode.FILE.equals(storeMode)) {
            String sessionStorePath = CONFIG.getConfig(ConfigurationKeys.STORE_FILE_DIR,
                                                       DEFAULT_SESSION_STORE_FILE_DIR);
            if (StringUtils.isBlank(sessionStorePath)) {
                throw new StoreException("the {store.file.dir} is empty.");
            }
            ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.FILE.getName(),
                                                              new Object[] {ROOT_SESSION_MANAGER_NAME, sessionStorePath});
            ASYNC_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.FILE.getName(),
                                                                          new Class[] {String.class, String.class}, new Object[] {ASYNC_COMMITTING_SESSION_MANAGER_NAME, null});
            RETRY_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.FILE.getName(),
                                                                          new Class[] {String.class, String.class}, new Object[] {RETRY_COMMITTING_SESSION_MANAGER_NAME, null});
            RETRY_ROLLBACKING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.FILE.getName(),
                                                                           new Class[] {String.class, String.class}, new Object[] {RETRY_ROLLBACKING_SESSION_MANAGER_NAME, null});
        } else if (StoreMode.REDIS.equals(storeMode)) {
            ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.REDIS.getName());
            ASYNC_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class,
                                                                          StoreMode.REDIS.getName(), new Object[] {ASYNC_COMMITTING_SESSION_MANAGER_NAME});
            RETRY_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class,
                                                                          StoreMode.REDIS.getName(), new Object[] {RETRY_COMMITTING_SESSION_MANAGER_NAME});
            RETRY_ROLLBACKING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class,
                                                                           StoreMode.REDIS.getName(), new Object[] {RETRY_ROLLBACKING_SESSION_MANAGER_NAME});
        } else {
            // unknown store
            throw new IllegalArgumentException("unknown store mode:" + mode);
        }
        reload(storeMode);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44

    DefaultCoordinator初始化

    DefaultCoordinator是TC的核心事务逻辑处理类,如:开启、提交、回滚全局事务,注册、提交、回滚分支事务都是由DefaultCoordinator负责协调处理的。DefaultCoordinato通过RpcServer与远程的TM、RM通信来实现分支事务的提交、回滚等。

    public DefaultCoordinator(RemotingServer remotingServer) {
        this.remotingServer = remotingServer;
        this.core = new DefaultCore(remotingServer);
    }
    
    • 1
    • 2
    • 3
    • 4

    在DefaultCoordinator里还创建了一个DefaultCore,该类是默认的 TC 事务操作实现,DefaultCoordinator的开启、提交、回滚全局事务,注册、提交、回滚分支事务都是委托给这个类。

    public DefaultCore(RemotingServer remotingServer) {
        List<AbstractCore> allCore = EnhancedServiceLoader.loadAll(AbstractCore.class,
            new Class[]{RemotingServer.class}, new Object[]{remotingServer});
        if (CollectionUtils.isNotEmpty(allCore)) {
            for (AbstractCore core : allCore) {
                coreMap.put(core.getHandleBranchType(), core);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    在DefaultCore构造方法里又会去通过SPI方式加载AbstractCore的实现类,类名在META-INF.services/io.seata.server.coordinator.AbstractCore文件里。
    在这里插入图片描述
    将这4个实例缓存在DefaultCore中的coreMap里,分别是AT,TCC,SAGA和XA模式下的事务处理类。

    然后调用DefaultCoordinator的初始化方法init

    public void init() {
        // 处理处于回滚状态可重试的事务的定时任务
        retryRollbacking.scheduleAtFixedRate(() -> {
            boolean lock = SessionHolder.retryRollbackingLock();
            if (lock) {
                try {
                    handleRetryRollbacking();
                } catch (Exception e) {
                    LOGGER.info("Exception retry rollbacking ... ", e);
                } finally {
                    SessionHolder.unRetryRollbackingLock();
                }
            }
        }, 0, ROLLBACKING_RETRY_PERIOD, TimeUnit.MILLISECONDS);
    
        // 处理二阶段可以重试提交的状态可重试的事务的定时任务
        retryCommitting.scheduleAtFixedRate(() -> {
            boolean lock = SessionHolder.retryCommittingLock();
            if (lock) {
                try {
                    handleRetryCommitting();
                } catch (Exception e) {
                    LOGGER.info("Exception retry committing ... ", e);
                } finally {
                    SessionHolder.unRetryCommittingLock();
                }
            }
        }, 0, COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);
    
        // 处理二阶段异步提交的事务的定时任务
        asyncCommitting.scheduleAtFixedRate(() -> {
            // 默认都是true
            boolean lock = SessionHolder.asyncCommittingLock();
            if (lock) {
                try {
                    // 处理异步提交
                    handleAsyncCommitting();
                } catch (Exception e) {
                    LOGGER.info("Exception async committing ... ", e);
                } finally {
                    SessionHolder.unAsyncCommittingLock();
                }
            }
        }, 0, ASYNC_COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);
    
        // 检查事务的第一阶段已经超时的事务,设置事务状态为TimeoutRollbacking,
        // 该事务会由其他定时任务执行回滚操作
        timeoutCheck.scheduleAtFixedRate(() -> {
            boolean lock = SessionHolder.txTimeoutCheckLock();
            if (lock) {
                try {
                    timeoutCheck();
                } catch (Exception e) {
                    LOGGER.info("Exception timeout checking ... ", e);
                } finally {
                    SessionHolder.unTxTimeoutCheckLock();
                }
            }
        }, 0, TIMEOUT_RETRY_PERIOD, TimeUnit.MILLISECONDS);
    
        // 根据unlog的保存天数调用RM删除unlog
        undoLogDelete.scheduleAtFixedRate(() -> {
            boolean lock = SessionHolder.undoLogDeleteLock();
            if (lock) {
                try {
                    undoLogDelete();
                } catch (Exception e) {
                    LOGGER.info("Exception undoLog deleting ... ", e);
                } finally {
                    SessionHolder.unUndoLogDeleteLock();
                }
            }
        }, UNDO_LOG_DELAY_DELETE_PERIOD, UNDO_LOG_DELETE_PERIOD, TimeUnit.MILLISECONDS);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74

    主要就是创建了5个定时任务,主要用于事务的重试机制,因为分布式环境的不稳定性会造成事务处于中间状态,所以要通过不断的重试机制来实现事务的最终一致性。这里面还有一个处理二阶段异步提交的事务的定时任务。

    初始化NettyRemotingServer

    在上面创建了NettyRemotingServer,所以在最后需要进行初始化,开始监听端口并阻塞在这里。

    @Override
    public void init() {
        // 注册与Client通信的Processor
        registerProcessor();
        // 再调用父类的init
        if (initialized.compareAndSet(false, true)) {
            super.init();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    NettyRemotingServer初始化时主要做了两件事:
    1、注册与Client通信的Processor,每个事务请求类型都对应一个Processor。当NettyRemotingServer接收到请求后,从注册的Processor列表中选出一个适合的Processor进行处理。

    private void registerProcessor() {
        // 1. 注册核心的ServerOnRequestProcessor,即与事务处理相关的Processor,
        // 如:全局事务开始、提交,分支事务注册、反馈当前状态等。
        // getHandler就是DefaultCoordinator
        ServerOnRequestProcessor onRequestProcessor =
            new ServerOnRequestProcessor(this, getHandler());
        super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER, onRequestProcessor, messageExecutor);
        super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT, onRequestProcessor, messageExecutor);
        super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN, onRequestProcessor, messageExecutor);
        super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT, onRequestProcessor, messageExecutor);
        super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY, onRequestProcessor, messageExecutor);
        super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT, onRequestProcessor, messageExecutor);
        super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK, onRequestProcessor, messageExecutor);
        super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS, onRequestProcessor, messageExecutor);
        super.registerProcessor(MessageType.TYPE_SEATA_MERGE, onRequestProcessor, messageExecutor);
        // 2. 注册ResponseProcessor,ResponseProcessor用于处理当Server端主动发起请求时,Client端回复的消息
        ServerOnResponseProcessor onResponseProcessor =
            new ServerOnResponseProcessor(getHandler(), getFutures());
        super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT_RESULT, onResponseProcessor, messageExecutor);
        super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK_RESULT, onResponseProcessor, messageExecutor);
        // 3. Client端发起RM注册请求时对应的Processor
        RegRmProcessor regRmProcessor = new RegRmProcessor(this);
        super.registerProcessor(MessageType.TYPE_REG_RM, regRmProcessor, messageExecutor);
        // 4. Client端发起TM注册请求时对应的Processor
        RegTmProcessor regTmProcessor = new RegTmProcessor(this);
        super.registerProcessor(MessageType.TYPE_REG_CLT, regTmProcessor, null);
        // 5. Client端发送心跳请求时对应的Processor
        ServerHeartbeatProcessor heartbeatMessageProcessor = new ServerHeartbeatProcessor(this);
        super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, heartbeatMessageProcessor, null);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    2、调用父类AbstractNettyRemotingServer去启动Netty服务端

    @Override
    public void init() {
        super.init();
        // 启动Netty
        serverBootstrap.start();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    继续调用父类AbstractNettyRemoting方法,创建一个定时任务。

    public void init() {
        // 用于定时清除超时的请求,3s执行一次
        timerExecutor.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                for (Map.Entry<Integer, MessageFuture> entry : futures.entrySet()) {
                    if (entry.getValue().isTimeout()) {
                        futures.remove(entry.getKey());
                        entry.getValue().setResultMessage(null);
                        if (LOGGER.isDebugEnabled()) {
                            LOGGER.debug("timeout clear future: {}", entry.getValue().getRequestMessage().getBody());
                        }
                    }
                }
    
                nowMills = System.currentTimeMillis();
            }
        }, TIMEOUT_CHECK_INTERNAL, TIMEOUT_CHECK_INTERNAL, TimeUnit.MILLISECONDS);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
  • 相关阅读:
    四个offer,选择去外包?
    stable diffusion 模型和lora融合
    同样是初级测试工程师,为啥他薪资高?会这几点面试必定出彩
    session 反序列化
    C#(C Sharp)学习笔记_变量常量与作用域【十二】
    (二)一个很尿性问题:重新刷新后 recyclerView.smoothScrollBy(-100, 0); 不起作用
    推荐这3款图片流动特效神器,一键即可让照片“动”起来
    【Java八股文总结】之MySQL数据库
    Selenium基础知识
    MySQL和Java程序建立连接的底层原理(JDBC),一个SQL语句是如何执行的呢?
  • 原文地址:https://blog.csdn.net/hsz2568952354/article/details/130643316