• RocketMQ之NameServer源码分析


    RocketMQ前言

    rocketMQ作为Apache顶级项目,虽然初始版本是根据Kafka更改,但是在迭代的过程中,用了自己的想法,加以实现,,所以很值得研究。在这里插入图片描述
    这是整体架构,RocketMQ服务端由两部分组成NameServer和Broker,NameServer是服务的注册中心,Broker会把自己的地址注册到NameServer,生产者和消费者启动的时候会先从NameServer获取Broker的地址,再去从Broker发送和接受消息。
    下面分别对几个模块的源码进行研读。

    NamesrvStartup

    首先看nameServer的源码,这里研究的是RocketMQ4.8.0版本。namesever可以看作注册中心。
    在下载的源码中,nameserver目录为namesrv下的NamesrvStartup,这是启动入口。

    public class NamesrvStartup {
    
        private static InternalLogger log;
        private static Properties properties = null;
        private static CommandLine commandLine = null;
    
        public static void main(String[] args) {
            main0(args);
        }
    
        public static NamesrvController main0(String[] args) {
    
            try {
                //todo 核心步骤一
                NamesrvController controller = createNamesrvController(args);
                start(controller);
                String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
                log.info(tip);
                System.out.printf("%s%n", tip);
                return controller;
            } catch (Throwable e) {
                e.printStackTrace();
                System.exit(-1);
            }
    
            return null;
        }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    实际上,这里主要调用了两个方法,一个是createNamesrvController,一个是start方法。
    我们首先看下createNamesrvController方法。

        public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
            System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
            //PackageConflictDetect.detectFastjson();
    
            Options options = ServerUtil.buildCommandlineOptions(new Options());
            commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
            if (null == commandLine) {
                System.exit(-1);
                return null;
            }
            //创建NamesrvConfig
            final NamesrvConfig namesrvConfig = new NamesrvConfig();
            //创建NettyServerConfig
            final NettyServerConfig nettyServerConfig = new NettyServerConfig();
            //设置启动端口号
            nettyServerConfig.setListenPort(9876);
            //解析启动-c参数
            if (commandLine.hasOption('c')) {
                String file = commandLine.getOptionValue('c');
                if (file != null) {
                    InputStream in = new BufferedInputStream(new FileInputStream(file));
                    properties = new Properties();
                    properties.load(in);
                    MixAll.properties2Object(properties, namesrvConfig);
                    MixAll.properties2Object(properties, nettyServerConfig);
    
                    namesrvConfig.setConfigStorePath(file);
    
                    System.out.printf("load config properties file OK, %s%n", file);
                    in.close();
                }
            }
            //解析启动-p参数
            if (commandLine.hasOption('p')) {
                InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
                MixAll.printObjectProperties(console, namesrvConfig);
                MixAll.printObjectProperties(console, nettyServerConfig);
                System.exit(0);
            }
            //将启动参数填充到namesrvConfig,nettyServerConfig
            MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
    
            if (null == namesrvConfig.getRocketmqHome()) {
                System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
                System.exit(-2);
            }
    
            LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
            JoranConfigurator configurator = new JoranConfigurator();
            configurator.setContext(lc);
            lc.reset();
            configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");
    
            log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
    
            MixAll.printObjectProperties(log, namesrvConfig);
            MixAll.printObjectProperties(log, nettyServerConfig);
            //todo 创建NameServerController
            final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
    
            // remember all configs to prevent discard
            controller.getConfiguration().registerConfig(properties);
    
            return controller;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65

    此方法主要是为了创建NamesrvController,并且初始化。
    首先会创建NamesrvConfig与NettyServerConfig方法,NamesrvConfig是nameServer的配置,NettyServerConfig是netty服务配置,然后回设置一个监听端口号,这里的端口号默认已经设置死了,9876端口,所以如果想更改,只有更改源码。

    继续往下,这里解析启动-c的参数,这是把namesrvConfig配置信息,与 nettyServerConfig读进来。
    继续往下,这里是解析启动-p参数,这个参数作用是把启动namesrev参数在控制台打印一遍。
    继续往下,将启动参数设置到namesrvConfig,nettyServerConfig

    继续往下,则到了new NamesrvController,这里是创建namesrvController方法了。
    结束createNamesrvController方法。

    下面是start方法

     public static NamesrvController start(final NamesrvController controller) throws Exception {
    
            if (null == controller) {
                throw new IllegalArgumentException("NamesrvController is null");
            }
    
            boolean initResult = controller.initialize();
            if (!initResult) {
                controller.shutdown();
                System.exit(-3);
            }
            //钩子方法
            Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
                @Override
                public Void call() throws Exception {
                    controller.shutdown();
                    return null;
                }
            }));
    
            controller.start();
    
            return controller;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    首先我们就看到调用了controller.initialize();初始化方法。我们看下初始化方法到底做了什么。

     public boolean initialize() {
            //加载KV配置
            this.kvConfigManager.load();
            //创建NettyServer网络处理对象
            this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
            //开启定时任务:每隔10s扫描一次Broker,移除不活跃的Broker
            this.remotingExecutor =
                Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
    
            this.registerProcessor();
            //每隔10s扫描一次为活跃Broker
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    //todo 每隔10s扫描一次为活跃Broker
                    NamesrvController.this.routeInfoManager.scanNotActiveBroker();
                }
            }, 5, 10, TimeUnit.SECONDS);
            //开启定时任务:每隔10min打印一次KV配置
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
                @Override
                public void run() {
                    NamesrvController.this.kvConfigManager.printAllPeriodically();
                }
            }, 1, 10, TimeUnit.MINUTES);
    
            if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
                // Register a listener to reload SslContext
                try {
                    fileWatchService = new FileWatchService(
                        new String[] {
                            TlsSystemConfig.tlsServerCertPath,
                            TlsSystemConfig.tlsServerKeyPath,
                            TlsSystemConfig.tlsServerTrustCertPath
                        },
                        new FileWatchService.Listener() {
                            boolean certChanged, keyChanged = false;
                            @Override
                            public void onChanged(String path) {
                                if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
                                    log.info("The trust certificate changed, reload the ssl context");
                                    reloadServerSslContext();
                                }
                                if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
                                    certChanged = true;
                                }
                                if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
                                    keyChanged = true;
                                }
                                if (certChanged && keyChanged) {
                                    log.info("The certificate and private key changed, reload the ssl context");
                                    certChanged = keyChanged = false;
                                    reloadServerSslContext();
                                }
                            }
                            private void reloadServerSslContext() {
                                ((NettyRemotingServer) remotingServer).loadSslContext();
                            }
                        });
                } catch (Exception e) {
                    log.warn("FileWatchService created error, can't load the certificate dynamically");
                }
            }
    
            return true;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67

    首先加载KV配置,并创建一个nettySerer网络处理对象。
    创建一个固定大小的线程池。
    然后进行注册,将前面创建的NettyServer组件进行注册。

    现在到了最重要的处理方式,开启定时任务,每10s扫描一次broker,这里会创建一个scheduleAtFixedRate线程池,每隔10s扫描,如果有broker超时120s没有发送心跳包,那么把broker剔除。也就是说每10s,运行run方法,扫描broker,如果120s没发心跳,认为broker挂了。

    RocketMQ有两个触发点来删除路由信息:

    • NameServer定期扫描brokerLiveTable检测上次心跳包与当前系统的时间差,如果时间超过120s,则需要移除broker。
    • Broker在正常关闭的情况下,会执行unregisterBroker指令这两种方式路由删除的方法都是一样的,都是从相关路由表中删除与该broker相关的信息。

    下面那些代码则无关紧要了,那么回去。
    在 NamesrvController start方法下,调用 controller.start();方法。
    我们看下这个start方法主要是什么。

        public void start() throws Exception {
            this.remotingServer.start();//启动NRS组件
    
            if (this.fileWatchService != null) {
                this.fileWatchService.start();
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    这里代码比较简单,主要是开启了NRS组件。也就是nettyRomtingServer。
    结束,这就是nameServer整个流程,较为简单。但是有几个亮点我们可以思考。

    NameServer的亮点

    亮点1

    nameserver信息都存放在hashmap中,但是hashmap是多线程不安全的。但是producer是需要从nameserve读,而broker需要对nameserver写topic等信息。如果有多个producer与多个broker,那么是多线程的访问方式,所以需要确保线程安全,并确保高效。所以用了读写锁的方式。确保了高并发。适用于读多写少的场景。

    那么在这里,如果问,为什么我们用 new ReentrantReadWriteLock();这种方式,而不是用sync或者reent呢?

    sync,Reent属于排他锁,同一个时刻只能一个访问。而为了读读的并发,读多写少的场景,那么不适合用其他锁,那些锁的粒度太大了。
    比如在routeinfo包下,有个方法getAllTopicList,这就是produce去读namesrv的场景,那么用lock.readlock方式
    比如在routeinfo包下,有个方法deleteTopict,这就是删除nameserv配置信息,那么用lock.writeLock方式
    比如在routeinfo包下,有个方法registerBroker,这就是broker向namesrv注册的场景,那么用lock.writeLock方式
    也就是说读锁不阻塞写锁,读读并发。所以这里用了锁 + ReadWriteLock方式。

    那么在这里,如果问,为什么不用concurrenthashmap?

    因为锁 + hashmap是可以达到强一致性,如果只是concurrenthashmap,那么就是弱一致性。写了之后,是过了一段时间才能读到,因为写的时候是cas加锁写,读不可见状态。

    亮点2

    存储是基于内存的,对比zk,他需要持久化。路由的信息其实是临时的信息,而不是一成不变的。zk需要状态,选举。zk对网络依赖进行选举,太繁琐了。持久化的事情不由nameserver做,而是给broker做。

    亮点3

    nameServer集群的无状态化,也能保证消息。nameserver不需要保证他们之间需要信息的同步,所以网络分区时也不存在对nameserver的可用性的影响。

    源码流程图

    在这里插入图片描述
    简单点的流程图。
    在这里插入图片描述

  • 相关阅读:
    Socket 编程基础
    web安全之post注入和cookie注入
    锁的概念!
    python 2.7.18安装jupyter遇到的一个错误
    Netty源码阅读(1)之——客户端源码梗概
    【mcuclub】LCD1602显示屏
    深入理解 python 虚拟机:字节码教程(1)——原来装饰器是这样实现的
    【Boot开发】Git工具详解,服务器搭建java和mysql环境由systemctl管理
    Docker夺命连环15问,你能坚持第几问?
    杂记,主要包含各种锁
  • 原文地址:https://blog.csdn.net/qq_40223516/article/details/126805138