• RocketMq源码分析(二)--nameServer启动流程


    一、NameServer启动

      NameServer启动的main函数位于org.apache.rocketmq.namesrv.NamesrvStartup类,执行代码如下

        public static NamesrvController main0(String[] args) {
    
            try {
                //创建NamesrvController
                NamesrvController controller = createNamesrvController(args);
                //初始化并启动NamesrvController
                start(controller);
                String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
                log.info(tip);
                System.out.printf("%s%n", tip);
                return controller;
            } catch (Throwable e) {
                e.printStackTrace();
                System.exit(-1);
            }
    
            return null;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

      NameServer启动可分为三步:

    • 解析配置文件,并将配置初始化到NameServerConfig、NettyServerConfig实例
    • 根据NameServerConfig、NettyServerConfig实例,创建NamesrvController
    • 初始化并启动NamesrvController

    二、创建NamesrvController实例

    1、读取配置

      启动命令中,-c参数可指定nameSrv配置文件的位置,创建NamesrvController实例前,先将配置文件中的key-value键值对解析成java.util.Properties对象,再将解析出来的properties初始化到namesrvConfig、nettyServerConfig 对象中。

    		//创建namesrvConfig、nettyServerConfig配置对象
            final NamesrvConfig namesrvConfig = new NamesrvConfig();
            final NettyServerConfig nettyServerConfig = new NettyServerConfig();
            nettyServerConfig.setListenPort(9876);
            if (commandLine.hasOption('c')) {
                // -c 参数指定nameSrv配置文件位置
                String file = commandLine.getOptionValue('c');
                if (file != null) {
                    InputStream in = new BufferedInputStream(new FileInputStream(file));
                    properties = new Properties();
                    //配置文件中加载key value配置
                    properties.load(in);
                    //反射初始化到配置对象中
                    MixAll.properties2Object(properties, namesrvConfig);
                    MixAll.properties2Object(properties, nettyServerConfig);
    
                    namesrvConfig.setConfigStorePath(file);
    
                    System.out.printf("load config properties file OK, %s%n", file);
                    in.close();
                }
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    配置类中的重要属性

      NamesrvConfig中的主要属性

    public class NamesrvConfig {
        //RocketMQ的主目录
        private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
        //NameServer存放键值对的文件
        private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json";
        //NameServer自己的配置存储路径
        private String configStorePath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties";
        //生产环境名称,默认center
        private String productEnvName = "center";
        //是否启动了测试集群,默认false
        private boolean clusterTest = false;
        //是否支持顺序消息,默认false
        private boolean orderMessageEnable = false;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

      NettyServerConfig中的主要属性

    public class NettyServerConfig implements Cloneable {
        //Netty监听端口号,创建对象时已设置为9876
        private int listenPort = 8888;
        //Netty工作线程数,默认8
        private int serverWorkerThreads = 8;
        //Netty的publicExecutor的线程数量
        private int serverCallbackExecutorThreads = 0;
        //Selector线程数量
        private int serverSelectorThreads = 3;
        //单向发送信号量
        private int serverOnewaySemaphoreValue = 256;
        //异步发送信号量
        private int serverAsyncSemaphoreValue = 64;
        //网络连接最大空闲时间,默认120S。如果连接空闲时间超过该参数设置的值,连接将被关闭
        private int serverChannelMaxIdleTimeSeconds = 120;
        //网络socket发送缓存区大小,默认64k
        private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;
        //网络socket接收缓存区大小,默认64k
        private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    2、创建NamesrvController对象

      完成配置NamesrvConfig、NettyServerConfig的初始化后,使用这两个对象创建NamesrvController对象

        public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {
            this.namesrvConfig = namesrvConfig;
            this.nettyServerConfig = nettyServerConfig;
            this.kvConfigManager = new KVConfigManager(this);
            this.routeInfoManager = new RouteInfoManager();
            this.brokerHousekeepingService = new BrokerHousekeepingService(this);
            this.configuration = new Configuration(
                log,
                this.namesrvConfig, this.nettyServerConfig
            );
            this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    三、启动NamesrvController

      启动NamesrvController核心方法是org.apache.rocketmq.namesrv.NamesrvStartup#start,代码如下

        public static NamesrvController start(final NamesrvController controller) throws Exception {
    
            if (null == controller) {
                throw new IllegalArgumentException("NamesrvController is null");
            }
            //初始化NamesrvController实例
            boolean initResult = controller.initialize();
            if (!initResult) {
                controller.shutdown();
                System.exit(-3);
            }
    
            Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, (Callable<Void>) () -> {
                //注册钩子函数
                controller.shutdown();
                return null;
            }));
            //启动NamesrvController
            controller.start();
    
            return controller;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

      start方法分为三个步骤:1)调用controller.initialize()进行初始化;2)注册一个优雅停机的钩子;3)启动NamesrvController

    1、初始化NamesrvController

      NamesrvControllerder的初始化,都在其initialize()方法中进行,主要完成如下几步:1)加载kv配置;2)创建Netty交互线程池;3)注册broker心跳任务

     public boolean initialize() {
            //加载`kvConfig.json`配置文件中的`KV`配置,放入org.apache.rocketmq.namesrv.kvconfig.KVConfigManager.configTable属性中
            this.kvConfigManager.load();
            //启动一个Netty服务器
            this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
            //初始化Netty网络交互的线程池
            this.remotingExecutor =
                Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
            //注册registerProcessor,默认使用DefaultRequestProcessor处理Netty信息
            this.registerProcessor();
            //注册心跳机制线程池,每10s扫描broker存活情况
            this.scheduledExecutorService.scheduleAtFixedRate(NamesrvController.this.routeInfoManager::scanNotActiveBroker, 5, 10, TimeUnit.SECONDS);
            //注册打印KV配置线程池,每10min打印configTable中所有kv
            this.scheduledExecutorService.scheduleAtFixedRate(NamesrvController.this.kvConfigManager::printAllPeriodically, 1, 10, TimeUnit.MINUTES);
    
            //省略  Register a listener to reload SslContext
            
            return true;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    2、钩子函数

      停机时,注册的钩子函数会起到作用,释放正在使用的资源,这里NamesrvController用于关闭Netty服务、Netty使用的线程池以及broker心跳定时任务

        public void shutdown() {
            this.remotingServer.shutdown();
            this.remotingExecutor.shutdown();
            this.scheduledExecutorService.shutdown();
    
            if (this.fileWatchService != null) {
                this.fileWatchService.shutdown();
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    3、启动

      NamesrvController需要启动Netty服务,如果SSL协议时,会额外启动一个FileWatchService

        public void start() throws Exception {
            this.remotingServer.start();
    
            if (this.fileWatchService != null) {
                this.fileWatchService.start();
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

      至此,nameServer启动完成
    在这里插入图片描述

  • 相关阅读:
    物联网AI MicroPython传感器学习 之 LCD1602液晶屏
    57 最长递增子序列
    01 导论【计量经济学及stata应用】
    Claude3荣登榜首,亚马逊云科技为您提供先行体验!
    关联路网拓扑特性的车辆出行行为画像分析
    分布式系统的设计原则
    腾讯面试真题 | 没在我八股文列表里。。。
    《深度学习入门:基于Python的理论和实现》再读笔记4
    WPF页面向后端传参
    c/c++开发时的VsCode插件
  • 原文地址:https://blog.csdn.net/u012786993/article/details/126559665